aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Baines <mail@cbaines.net>2024-11-19 18:43:43 +0000
committerChristopher Baines <mail@cbaines.net>2024-12-16 09:18:12 +0000
commit2f39c58d6ca72cd869ba69e03d639f36d497e9a8 (patch)
tree78caa9af60eb057eda659e969207a8457e2fa3f2
downloadknots-2f39c58d6ca72cd869ba69e03d639f36d497e9a8.tar
knots-2f39c58d6ca72cd869ba69e03d639f36d497e9a8.tar.gz
Initial commit
-rw-r--r--.gitignore19
-rw-r--r--COPYING674
-rw-r--r--Makefile.am17
-rw-r--r--README4
-rwxr-xr-xbootstrap3
-rw-r--r--configure.ac19
-rw-r--r--guile.am21
-rw-r--r--guix-dev.scm50
-rw-r--r--knots.scm21
-rw-r--r--knots/non-blocking.scm63
-rw-r--r--knots/parallelism.scm197
-rw-r--r--knots/promise.scm78
-rw-r--r--knots/queue.scm47
-rw-r--r--knots/resource-pool.scm485
-rw-r--r--knots/timeout.scm200
-rw-r--r--knots/web-server.scm263
-rw-r--r--knots/worker-threads.scm577
-rw-r--r--pre-inst-env.in13
-rw-r--r--tests.scm28
-rw-r--r--tests/non-blocking.scm31
-rw-r--r--tests/parallelism.scm15
-rw-r--r--tests/promise.scm20
-rw-r--r--tests/queue.scm22
-rw-r--r--tests/resource-pool.scm18
-rw-r--r--tests/timeout.scm22
-rw-r--r--tests/web-server.scm30
-rw-r--r--tests/worker-threads.scm32
27 files changed, 2969 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..10520ac
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,19 @@
+*.go
+Makefile.in
+Makefile
+aclocal.m4
+autom4te.cache
+config.log
+config.status
+configure
+
+build-aux/install-sh
+build-aux/missing
+
+*.log
+tests/*.log
+tests/*.trs
+
+pre-inst-env
+
+.local.envrc
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..94a9ed0
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,674 @@
+ GNU GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The GNU General Public License is a free, copyleft license for
+software and other kinds of works.
+
+ The licenses for most software and other practical works are designed
+to take away your freedom to share and change the works. By contrast,
+the GNU General Public License is intended to guarantee your freedom to
+share and change all versions of a program--to make sure it remains free
+software for all its users. We, the Free Software Foundation, use the
+GNU General Public License for most of our software; it applies also to
+any other work released this way by its authors. You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+them if you wish), that you receive source code or can get it if you
+want it, that you can change the software or use pieces of it in new
+free programs, and that you know you can do these things.
+
+ To protect your rights, we need to prevent others from denying you
+these rights or asking you to surrender the rights. Therefore, you have
+certain responsibilities if you distribute copies of the software, or if
+you modify it: responsibilities to respect the freedom of others.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must pass on to the recipients the same
+freedoms that you received. You must make sure that they, too, receive
+or can get the source code. And you must show them these terms so they
+know their rights.
+
+ Developers that use the GNU GPL protect your rights with two steps:
+(1) assert copyright on the software, and (2) offer you this License
+giving you legal permission to copy, distribute and/or modify it.
+
+ For the developers' and authors' protection, the GPL clearly explains
+that there is no warranty for this free software. For both users' and
+authors' sake, the GPL requires that modified versions be marked as
+changed, so that their problems will not be attributed erroneously to
+authors of previous versions.
+
+ Some devices are designed to deny users access to install or run
+modified versions of the software inside them, although the manufacturer
+can do so. This is fundamentally incompatible with the aim of
+protecting users' freedom to change the software. The systematic
+pattern of such abuse occurs in the area of products for individuals to
+use, which is precisely where it is most unacceptable. Therefore, we
+have designed this version of the GPL to prohibit the practice for those
+products. If such problems arise substantially in other domains, we
+stand ready to extend this provision to those domains in future versions
+of the GPL, as needed to protect the freedom of users.
+
+ Finally, every program is threatened constantly by software patents.
+States should not allow patents to restrict development and use of
+software on general-purpose computers, but in those that do, we wish to
+avoid the special danger that patents applied to a free program could
+make it effectively proprietary. To prevent this, the GPL assures that
+patents cannot be used to render the program non-free.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ TERMS AND CONDITIONS
+
+ 0. Definitions.
+
+ "This License" refers to version 3 of the GNU General Public License.
+
+ "Copyright" also means copyright-like laws that apply to other kinds of
+works, such as semiconductor masks.
+
+ "The Program" refers to any copyrightable work licensed under this
+License. Each licensee is addressed as "you". "Licensees" and
+"recipients" may be individuals or organizations.
+
+ To "modify" a work means to copy from or adapt all or part of the work
+in a fashion requiring copyright permission, other than the making of an
+exact copy. The resulting work is called a "modified version" of the
+earlier work or a work "based on" the earlier work.
+
+ A "covered work" means either the unmodified Program or a work based
+on the Program.
+
+ To "propagate" a work means to do anything with it that, without
+permission, would make you directly or secondarily liable for
+infringement under applicable copyright law, except executing it on a
+computer or modifying a private copy. Propagation includes copying,
+distribution (with or without modification), making available to the
+public, and in some countries other activities as well.
+
+ To "convey" a work means any kind of propagation that enables other
+parties to make or receive copies. Mere interaction with a user through
+a computer network, with no transfer of a copy, is not conveying.
+
+ An interactive user interface displays "Appropriate Legal Notices"
+to the extent that it includes a convenient and prominently visible
+feature that (1) displays an appropriate copyright notice, and (2)
+tells the user that there is no warranty for the work (except to the
+extent that warranties are provided), that licensees may convey the
+work under this License, and how to view a copy of this License. If
+the interface presents a list of user commands or options, such as a
+menu, a prominent item in the list meets this criterion.
+
+ 1. Source Code.
+
+ The "source code" for a work means the preferred form of the work
+for making modifications to it. "Object code" means any non-source
+form of a work.
+
+ A "Standard Interface" means an interface that either is an official
+standard defined by a recognized standards body, or, in the case of
+interfaces specified for a particular programming language, one that
+is widely used among developers working in that language.
+
+ The "System Libraries" of an executable work include anything, other
+than the work as a whole, that (a) is included in the normal form of
+packaging a Major Component, but which is not part of that Major
+Component, and (b) serves only to enable use of the work with that
+Major Component, or to implement a Standard Interface for which an
+implementation is available to the public in source code form. A
+"Major Component", in this context, means a major essential component
+(kernel, window system, and so on) of the specific operating system
+(if any) on which the executable work runs, or a compiler used to
+produce the work, or an object code interpreter used to run it.
+
+ The "Corresponding Source" for a work in object code form means all
+the source code needed to generate, install, and (for an executable
+work) run the object code and to modify the work, including scripts to
+control those activities. However, it does not include the work's
+System Libraries, or general-purpose tools or generally available free
+programs which are used unmodified in performing those activities but
+which are not part of the work. For example, Corresponding Source
+includes interface definition files associated with source files for
+the work, and the source code for shared libraries and dynamically
+linked subprograms that the work is specifically designed to require,
+such as by intimate data communication or control flow between those
+subprograms and other parts of the work.
+
+ The Corresponding Source need not include anything that users
+can regenerate automatically from other parts of the Corresponding
+Source.
+
+ The Corresponding Source for a work in source code form is that
+same work.
+
+ 2. Basic Permissions.
+
+ All rights granted under this License are granted for the term of
+copyright on the Program, and are irrevocable provided the stated
+conditions are met. This License explicitly affirms your unlimited
+permission to run the unmodified Program. The output from running a
+covered work is covered by this License only if the output, given its
+content, constitutes a covered work. This License acknowledges your
+rights of fair use or other equivalent, as provided by copyright law.
+
+ You may make, run and propagate covered works that you do not
+convey, without conditions so long as your license otherwise remains
+in force. You may convey covered works to others for the sole purpose
+of having them make modifications exclusively for you, or provide you
+with facilities for running those works, provided that you comply with
+the terms of this License in conveying all material for which you do
+not control copyright. Those thus making or running the covered works
+for you must do so exclusively on your behalf, under your direction
+and control, on terms that prohibit them from making any copies of
+your copyrighted material outside their relationship with you.
+
+ Conveying under any other circumstances is permitted solely under
+the conditions stated below. Sublicensing is not allowed; section 10
+makes it unnecessary.
+
+ 3. Protecting Users' Legal Rights From Anti-Circumvention Law.
+
+ No covered work shall be deemed part of an effective technological
+measure under any applicable law fulfilling obligations under article
+11 of the WIPO copyright treaty adopted on 20 December 1996, or
+similar laws prohibiting or restricting circumvention of such
+measures.
+
+ When you convey a covered work, you waive any legal power to forbid
+circumvention of technological measures to the extent such circumvention
+is effected by exercising rights under this License with respect to
+the covered work, and you disclaim any intention to limit operation or
+modification of the work as a means of enforcing, against the work's
+users, your or third parties' legal rights to forbid circumvention of
+technological measures.
+
+ 4. Conveying Verbatim Copies.
+
+ You may convey verbatim copies of the Program's source code as you
+receive it, in any medium, provided that you conspicuously and
+appropriately publish on each copy an appropriate copyright notice;
+keep intact all notices stating that this License and any
+non-permissive terms added in accord with section 7 apply to the code;
+keep intact all notices of the absence of any warranty; and give all
+recipients a copy of this License along with the Program.
+
+ You may charge any price or no price for each copy that you convey,
+and you may offer support or warranty protection for a fee.
+
+ 5. Conveying Modified Source Versions.
+
+ You may convey a work based on the Program, or the modifications to
+produce it from the Program, in the form of source code under the
+terms of section 4, provided that you also meet all of these conditions:
+
+ a) The work must carry prominent notices stating that you modified
+ it, and giving a relevant date.
+
+ b) The work must carry prominent notices stating that it is
+ released under this License and any conditions added under section
+ 7. This requirement modifies the requirement in section 4 to
+ "keep intact all notices".
+
+ c) You must license the entire work, as a whole, under this
+ License to anyone who comes into possession of a copy. This
+ License will therefore apply, along with any applicable section 7
+ additional terms, to the whole of the work, and all its parts,
+ regardless of how they are packaged. This License gives no
+ permission to license the work in any other way, but it does not
+ invalidate such permission if you have separately received it.
+
+ d) If the work has interactive user interfaces, each must display
+ Appropriate Legal Notices; however, if the Program has interactive
+ interfaces that do not display Appropriate Legal Notices, your
+ work need not make them do so.
+
+ A compilation of a covered work with other separate and independent
+works, which are not by their nature extensions of the covered work,
+and which are not combined with it such as to form a larger program,
+in or on a volume of a storage or distribution medium, is called an
+"aggregate" if the compilation and its resulting copyright are not
+used to limit the access or legal rights of the compilation's users
+beyond what the individual works permit. Inclusion of a covered work
+in an aggregate does not cause this License to apply to the other
+parts of the aggregate.
+
+ 6. Conveying Non-Source Forms.
+
+ You may convey a covered work in object code form under the terms
+of sections 4 and 5, provided that you also convey the
+machine-readable Corresponding Source under the terms of this License,
+in one of these ways:
+
+ a) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by the
+ Corresponding Source fixed on a durable physical medium
+ customarily used for software interchange.
+
+ b) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by a
+ written offer, valid for at least three years and valid for as
+ long as you offer spare parts or customer support for that product
+ model, to give anyone who possesses the object code either (1) a
+ copy of the Corresponding Source for all the software in the
+ product that is covered by this License, on a durable physical
+ medium customarily used for software interchange, for a price no
+ more than your reasonable cost of physically performing this
+ conveying of source, or (2) access to copy the
+ Corresponding Source from a network server at no charge.
+
+ c) Convey individual copies of the object code with a copy of the
+ written offer to provide the Corresponding Source. This
+ alternative is allowed only occasionally and noncommercially, and
+ only if you received the object code with such an offer, in accord
+ with subsection 6b.
+
+ d) Convey the object code by offering access from a designated
+ place (gratis or for a charge), and offer equivalent access to the
+ Corresponding Source in the same way through the same place at no
+ further charge. You need not require recipients to copy the
+ Corresponding Source along with the object code. If the place to
+ copy the object code is a network server, the Corresponding Source
+ may be on a different server (operated by you or a third party)
+ that supports equivalent copying facilities, provided you maintain
+ clear directions next to the object code saying where to find the
+ Corresponding Source. Regardless of what server hosts the
+ Corresponding Source, you remain obligated to ensure that it is
+ available for as long as needed to satisfy these requirements.
+
+ e) Convey the object code using peer-to-peer transmission, provided
+ you inform other peers where the object code and Corresponding
+ Source of the work are being offered to the general public at no
+ charge under subsection 6d.
+
+ A separable portion of the object code, whose source code is excluded
+from the Corresponding Source as a System Library, need not be
+included in conveying the object code work.
+
+ A "User Product" is either (1) a "consumer product", which means any
+tangible personal property which is normally used for personal, family,
+or household purposes, or (2) anything designed or sold for incorporation
+into a dwelling. In determining whether a product is a consumer product,
+doubtful cases shall be resolved in favor of coverage. For a particular
+product received by a particular user, "normally used" refers to a
+typical or common use of that class of product, regardless of the status
+of the particular user or of the way in which the particular user
+actually uses, or expects or is expected to use, the product. A product
+is a consumer product regardless of whether the product has substantial
+commercial, industrial or non-consumer uses, unless such uses represent
+the only significant mode of use of the product.
+
+ "Installation Information" for a User Product means any methods,
+procedures, authorization keys, or other information required to install
+and execute modified versions of a covered work in that User Product from
+a modified version of its Corresponding Source. The information must
+suffice to ensure that the continued functioning of the modified object
+code is in no case prevented or interfered with solely because
+modification has been made.
+
+ If you convey an object code work under this section in, or with, or
+specifically for use in, a User Product, and the conveying occurs as
+part of a transaction in which the right of possession and use of the
+User Product is transferred to the recipient in perpetuity or for a
+fixed term (regardless of how the transaction is characterized), the
+Corresponding Source conveyed under this section must be accompanied
+by the Installation Information. But this requirement does not apply
+if neither you nor any third party retains the ability to install
+modified object code on the User Product (for example, the work has
+been installed in ROM).
+
+ The requirement to provide Installation Information does not include a
+requirement to continue to provide support service, warranty, or updates
+for a work that has been modified or installed by the recipient, or for
+the User Product in which it has been modified or installed. Access to a
+network may be denied when the modification itself materially and
+adversely affects the operation of the network or violates the rules and
+protocols for communication across the network.
+
+ Corresponding Source conveyed, and Installation Information provided,
+in accord with this section must be in a format that is publicly
+documented (and with an implementation available to the public in
+source code form), and must require no special password or key for
+unpacking, reading or copying.
+
+ 7. Additional Terms.
+
+ "Additional permissions" are terms that supplement the terms of this
+License by making exceptions from one or more of its conditions.
+Additional permissions that are applicable to the entire Program shall
+be treated as though they were included in this License, to the extent
+that they are valid under applicable law. If additional permissions
+apply only to part of the Program, that part may be used separately
+under those permissions, but the entire Program remains governed by
+this License without regard to the additional permissions.
+
+ When you convey a copy of a covered work, you may at your option
+remove any additional permissions from that copy, or from any part of
+it. (Additional permissions may be written to require their own
+removal in certain cases when you modify the work.) You may place
+additional permissions on material, added by you to a covered work,
+for which you have or can give appropriate copyright permission.
+
+ Notwithstanding any other provision of this License, for material you
+add to a covered work, you may (if authorized by the copyright holders of
+that material) supplement the terms of this License with terms:
+
+ a) Disclaiming warranty or limiting liability differently from the
+ terms of sections 15 and 16 of this License; or
+
+ b) Requiring preservation of specified reasonable legal notices or
+ author attributions in that material or in the Appropriate Legal
+ Notices displayed by works containing it; or
+
+ c) Prohibiting misrepresentation of the origin of that material, or
+ requiring that modified versions of such material be marked in
+ reasonable ways as different from the original version; or
+
+ d) Limiting the use for publicity purposes of names of licensors or
+ authors of the material; or
+
+ e) Declining to grant rights under trademark law for use of some
+ trade names, trademarks, or service marks; or
+
+ f) Requiring indemnification of licensors and authors of that
+ material by anyone who conveys the material (or modified versions of
+ it) with contractual assumptions of liability to the recipient, for
+ any liability that these contractual assumptions directly impose on
+ those licensors and authors.
+
+ All other non-permissive additional terms are considered "further
+restrictions" within the meaning of section 10. If the Program as you
+received it, or any part of it, contains a notice stating that it is
+governed by this License along with a term that is a further
+restriction, you may remove that term. If a license document contains
+a further restriction but permits relicensing or conveying under this
+License, you may add to a covered work material governed by the terms
+of that license document, provided that the further restriction does
+not survive such relicensing or conveying.
+
+ If you add terms to a covered work in accord with this section, you
+must place, in the relevant source files, a statement of the
+additional terms that apply to those files, or a notice indicating
+where to find the applicable terms.
+
+ Additional terms, permissive or non-permissive, may be stated in the
+form of a separately written license, or stated as exceptions;
+the above requirements apply either way.
+
+ 8. Termination.
+
+ You may not propagate or modify a covered work except as expressly
+provided under this License. Any attempt otherwise to propagate or
+modify it is void, and will automatically terminate your rights under
+this License (including any patent licenses granted under the third
+paragraph of section 11).
+
+ However, if you cease all violation of this License, then your
+license from a particular copyright holder is reinstated (a)
+provisionally, unless and until the copyright holder explicitly and
+finally terminates your license, and (b) permanently, if the copyright
+holder fails to notify you of the violation by some reasonable means
+prior to 60 days after the cessation.
+
+ Moreover, your license from a particular copyright holder is
+reinstated permanently if the copyright holder notifies you of the
+violation by some reasonable means, this is the first time you have
+received notice of violation of this License (for any work) from that
+copyright holder, and you cure the violation prior to 30 days after
+your receipt of the notice.
+
+ Termination of your rights under this section does not terminate the
+licenses of parties who have received copies or rights from you under
+this License. If your rights have been terminated and not permanently
+reinstated, you do not qualify to receive new licenses for the same
+material under section 10.
+
+ 9. Acceptance Not Required for Having Copies.
+
+ You are not required to accept this License in order to receive or
+run a copy of the Program. Ancillary propagation of a covered work
+occurring solely as a consequence of using peer-to-peer transmission
+to receive a copy likewise does not require acceptance. However,
+nothing other than this License grants you permission to propagate or
+modify any covered work. These actions infringe copyright if you do
+not accept this License. Therefore, by modifying or propagating a
+covered work, you indicate your acceptance of this License to do so.
+
+ 10. Automatic Licensing of Downstream Recipients.
+
+ Each time you convey a covered work, the recipient automatically
+receives a license from the original licensors, to run, modify and
+propagate that work, subject to this License. You are not responsible
+for enforcing compliance by third parties with this License.
+
+ An "entity transaction" is a transaction transferring control of an
+organization, or substantially all assets of one, or subdividing an
+organization, or merging organizations. If propagation of a covered
+work results from an entity transaction, each party to that
+transaction who receives a copy of the work also receives whatever
+licenses to the work the party's predecessor in interest had or could
+give under the previous paragraph, plus a right to possession of the
+Corresponding Source of the work from the predecessor in interest, if
+the predecessor has it or can get it with reasonable efforts.
+
+ You may not impose any further restrictions on the exercise of the
+rights granted or affirmed under this License. For example, you may
+not impose a license fee, royalty, or other charge for exercise of
+rights granted under this License, and you may not initiate litigation
+(including a cross-claim or counterclaim in a lawsuit) alleging that
+any patent claim is infringed by making, using, selling, offering for
+sale, or importing the Program or any portion of it.
+
+ 11. Patents.
+
+ A "contributor" is a copyright holder who authorizes use under this
+License of the Program or a work on which the Program is based. The
+work thus licensed is called the contributor's "contributor version".
+
+ A contributor's "essential patent claims" are all patent claims
+owned or controlled by the contributor, whether already acquired or
+hereafter acquired, that would be infringed by some manner, permitted
+by this License, of making, using, or selling its contributor version,
+but do not include claims that would be infringed only as a
+consequence of further modification of the contributor version. For
+purposes of this definition, "control" includes the right to grant
+patent sublicenses in a manner consistent with the requirements of
+this License.
+
+ Each contributor grants you a non-exclusive, worldwide, royalty-free
+patent license under the contributor's essential patent claims, to
+make, use, sell, offer for sale, import and otherwise run, modify and
+propagate the contents of its contributor version.
+
+ In the following three paragraphs, a "patent license" is any express
+agreement or commitment, however denominated, not to enforce a patent
+(such as an express permission to practice a patent or covenant not to
+sue for patent infringement). To "grant" such a patent license to a
+party means to make such an agreement or commitment not to enforce a
+patent against the party.
+
+ If you convey a covered work, knowingly relying on a patent license,
+and the Corresponding Source of the work is not available for anyone
+to copy, free of charge and under the terms of this License, through a
+publicly available network server or other readily accessible means,
+then you must either (1) cause the Corresponding Source to be so
+available, or (2) arrange to deprive yourself of the benefit of the
+patent license for this particular work, or (3) arrange, in a manner
+consistent with the requirements of this License, to extend the patent
+license to downstream recipients. "Knowingly relying" means you have
+actual knowledge that, but for the patent license, your conveying the
+covered work in a country, or your recipient's use of the covered work
+in a country, would infringe one or more identifiable patents in that
+country that you have reason to believe are valid.
+
+ If, pursuant to or in connection with a single transaction or
+arrangement, you convey, or propagate by procuring conveyance of, a
+covered work, and grant a patent license to some of the parties
+receiving the covered work authorizing them to use, propagate, modify
+or convey a specific copy of the covered work, then the patent license
+you grant is automatically extended to all recipients of the covered
+work and works based on it.
+
+ A patent license is "discriminatory" if it does not include within
+the scope of its coverage, prohibits the exercise of, or is
+conditioned on the non-exercise of one or more of the rights that are
+specifically granted under this License. You may not convey a covered
+work if you are a party to an arrangement with a third party that is
+in the business of distributing software, under which you make payment
+to the third party based on the extent of your activity of conveying
+the work, and under which the third party grants, to any of the
+parties who would receive the covered work from you, a discriminatory
+patent license (a) in connection with copies of the covered work
+conveyed by you (or copies made from those copies), or (b) primarily
+for and in connection with specific products or compilations that
+contain the covered work, unless you entered into that arrangement,
+or that patent license was granted, prior to 28 March 2007.
+
+ Nothing in this License shall be construed as excluding or limiting
+any implied license or other defenses to infringement that may
+otherwise be available to you under applicable patent law.
+
+ 12. No Surrender of Others' Freedom.
+
+ If conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot convey a
+covered work so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you may
+not convey it at all. For example, if you agree to terms that obligate you
+to collect a royalty for further conveying from those to whom you convey
+the Program, the only way you could satisfy both those terms and this
+License would be to refrain entirely from conveying the Program.
+
+ 13. Use with the GNU Affero General Public License.
+
+ Notwithstanding any other provision of this License, you have
+permission to link or combine any covered work with a work licensed
+under version 3 of the GNU Affero General Public License into a single
+combined work, and to convey the resulting work. The terms of this
+License will continue to apply to the part which is the covered work,
+but the special requirements of the GNU Affero General Public License,
+section 13, concerning interaction through a network will apply to the
+combination as such.
+
+ 14. Revised Versions of this License.
+
+ The Free Software Foundation may publish revised and/or new versions of
+the GNU General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+ Each version is given a distinguishing version number. If the
+Program specifies that a certain numbered version of the GNU General
+Public License "or any later version" applies to it, you have the
+option of following the terms and conditions either of that numbered
+version or of any later version published by the Free Software
+Foundation. If the Program does not specify a version number of the
+GNU General Public License, you may choose any version ever published
+by the Free Software Foundation.
+
+ If the Program specifies that a proxy can decide which future
+versions of the GNU General Public License can be used, that proxy's
+public statement of acceptance of a version permanently authorizes you
+to choose that version for the Program.
+
+ Later license versions may give you additional or different
+permissions. However, no additional obligations are imposed on any
+author or copyright holder as a result of your choosing to follow a
+later version.
+
+ 15. Disclaimer of Warranty.
+
+ THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
+APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
+HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
+OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
+IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
+ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+ 16. Limitation of Liability.
+
+ IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
+THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
+GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
+USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
+DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
+PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
+EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGES.
+
+ 17. Interpretation of Sections 15 and 16.
+
+ If the disclaimer of warranty and limitation of liability provided
+above cannot be given local legal effect according to their terms,
+reviewing courts shall apply local law that most closely approximates
+an absolute waiver of all civil liability in connection with the
+Program, unless a warranty or assumption of liability accompanies a
+copy of the Program in return for a fee.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Programs
+
+ If you develop a new program, and you want it to be of the greatest
+possible use to the public, the best way to achieve this is to make it
+free software which everyone can redistribute and change under these terms.
+
+ To do so, attach the following notices to the program. It is safest
+to attach them to the start of each source file to most effectively
+state the exclusion of warranty; and each file should have at least
+the "copyright" line and a pointer to where the full notice is found.
+
+ <one line to give the program's name and a brief idea of what it does.>
+ Copyright (C) <year> <name of author>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+Also add information on how to contact you by electronic and paper mail.
+
+ If the program does terminal interaction, make it output a short
+notice like this when it starts in an interactive mode:
+
+ <program> Copyright (C) <year> <name of author>
+ This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
+ This is free software, and you are welcome to redistribute it
+ under certain conditions; type `show c' for details.
+
+The hypothetical commands `show w' and `show c' should show the appropriate
+parts of the General Public License. Of course, your program's commands
+might be different; for a GUI interface, you would use an "about box".
+
+ You should also get your employer (if you work as a programmer) or school,
+if any, to sign a "copyright disclaimer" for the program, if necessary.
+For more information on this, and how to apply and follow the GNU GPL, see
+<http://www.gnu.org/licenses/>.
+
+ The GNU General Public License does not permit incorporating your program
+into proprietary programs. If your program is a subroutine library, you
+may consider it more useful to permit linking proprietary applications with
+the library. If this is what you want to do, use the GNU Lesser General
+Public License instead of this License. But first, please read
+<http://www.gnu.org/philosophy/why-not-lgpl.html>.
diff --git a/Makefile.am b/Makefile.am
new file mode 100644
index 0000000..5643873
--- /dev/null
+++ b/Makefile.am
@@ -0,0 +1,17 @@
+include guile.am
+
+SOURCES = \
+ knots.scm \
+ knots/non-blocking.scm \
+ knots/parallelism.scm \
+ knots/promise.scm \
+ knots/queue.scm \
+ knots/resource-pool.scm \
+ knots/timeout.scm \
+ knots/web-server.scm \
+ knots/worker-threads.scm
+
+EXTRA_DIST = \
+ README \
+ bootstrap \
+ pre-inst-env.in
diff --git a/README b/README
new file mode 100644
index 0000000..e593a79
--- /dev/null
+++ b/README
@@ -0,0 +1,4 @@
+-*- mode: org -*-
+
+This Guile library provides useful patterns and functionality to use
+Guile Fibers.
diff --git a/bootstrap b/bootstrap
new file mode 100755
index 0000000..5af6611
--- /dev/null
+++ b/bootstrap
@@ -0,0 +1,3 @@
+#! /bin/sh
+
+autoreconf --verbose --install --force
diff --git a/configure.ac b/configure.ac
new file mode 100644
index 0000000..fe1cb2f
--- /dev/null
+++ b/configure.ac
@@ -0,0 +1,19 @@
+AC_INIT([guile-knots], [0.1])
+AC_CONFIG_AUX_DIR([build-aux])
+AM_INIT_AUTOMAKE([-Wall -Werror foreign])
+
+GUILE_PKG([3.0])
+GUILE_PROGS
+if test "x$GUILD" = "x"; then
+ AC_MSG_ERROR(['guild' binary not found; please check your guile 3 installation.])
+fi
+
+if test "$cross_compiling" != no; then
+ GUILE_TARGET="--target=$host_alias"
+ AC_SUBST([GUILE_TARGET])
+fi
+
+AC_CONFIG_FILES([Makefile])
+AC_CONFIG_FILES([pre-inst-env], [chmod +x pre-inst-env])
+
+AC_OUTPUT
diff --git a/guile.am b/guile.am
new file mode 100644
index 0000000..743bdea
--- /dev/null
+++ b/guile.am
@@ -0,0 +1,21 @@
+moddir=$(datadir)/guile/site/$(GUILE_EFFECTIVE_VERSION)
+godir=$(libdir)/guile/$(GUILE_EFFECTIVE_VERSION)/site-ccache
+
+GOBJECTS = $(SOURCES:%.scm=%.go)
+
+nobase_dist_mod_DATA = $(SOURCES) $(NOCOMP_SOURCES)
+nobase_go_DATA = $(GOBJECTS)
+
+# Make sure source files are installed first, so that the mtime of
+# installed compiled files is greater than that of installed source
+# files. See
+# <http://lists.gnu.org/archive/html/guile-devel/2010-07/msg00125.html>
+# for details.
+guile_install_go_files = install-nobase_goDATA
+$(guile_install_go_files): install-nobase_dist_modDATA
+
+CLEANFILES = $(GOBJECTS)
+GUILE_WARNINGS = -Wunbound-variable -Warity-mismatch -Wformat
+SUFFIXES = .scm .go
+.scm.go:
+ $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILD) compile $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<"
diff --git a/guix-dev.scm b/guix-dev.scm
new file mode 100644
index 0000000..6b51e8a
--- /dev/null
+++ b/guix-dev.scm
@@ -0,0 +1,50 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+;;; Run the following command to enter a development environment for
+;;; Guile Knots:
+;;;
+;;; $ guix environment -l guix-dev.scm
+
+(use-modules ((guix licenses) #:prefix license:)
+ (guix packages)
+ (guix build-system gnu)
+ (gnu packages)
+ (gnu packages autotools)
+ (gnu packages guile)
+ (gnu packages guile-xyz)
+ (gnu packages pkg-config)
+ (srfi srfi-1))
+
+(package
+ (name "guile-knots")
+ (version "0.0.0")
+ (source #f)
+ (build-system gnu-build-system)
+ (inputs
+ (list guile-3.0
+ guile-fibers))
+ (native-inputs
+ (list autoconf
+ automake
+ pkg-config))
+ (synopsis "TODO")
+ (description "TODO")
+ (home-page "TODO")
+ (license license:gpl3+))
diff --git a/knots.scm b/knots.scm
new file mode 100644
index 0000000..d545770
--- /dev/null
+++ b/knots.scm
@@ -0,0 +1,21 @@
+(define-module (knots)
+ #:use-module (ice-9 suspendable-ports)
+ #:export (call-with-default-io-waiters
+
+ wait-when-system-clock-behind))
+
+(define (call-with-default-io-waiters thunk)
+ (parameterize
+ ((current-read-waiter (@@ (ice-9 suspendable-ports)
+ default-read-waiter))
+ (current-write-waiter (@@ (ice-9 suspendable-ports)
+ default-write-waiter)))
+ (thunk)))
+
+(define (wait-when-system-clock-behind)
+ (let ((start-of-the-year-2000 946684800))
+ (while (< (current-time)
+ start-of-the-year-2000)
+ (simple-format (current-error-port)
+ "warning: system clock potentially behind, waiting\n")
+ (sleep 20))))
diff --git a/knots/non-blocking.scm b/knots/non-blocking.scm
new file mode 100644
index 0000000..5914143
--- /dev/null
+++ b/knots/non-blocking.scm
@@ -0,0 +1,63 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (knots non-blocking)
+ #:use-module (web uri)
+ #:use-module (web client)
+ #:export (non-blocking-port
+ nonblocking-open-socket-for-uri))
+
+(define (non-blocking-port port)
+ "Make PORT non-blocking and return it."
+ (let ((flags (fcntl port F_GETFL)))
+ (when (zero? (logand O_NONBLOCK flags))
+ (fcntl port F_SETFL (logior O_NONBLOCK flags)))
+ port))
+
+(define* (nonblocking-open-socket-for-uri uri
+ #:key (verify-certificate? #t))
+ (define tls-wrap
+ (@@ (web client) tls-wrap))
+
+ (define https?
+ (eq? 'https (uri-scheme uri)))
+
+ (define plain-uri
+ (if https?
+ (build-uri
+ 'http
+ #:userinfo (uri-userinfo uri)
+ #:host (uri-host uri)
+ #:port (or (uri-port uri) 443)
+ #:path (uri-path uri)
+ #:query (uri-query uri)
+ #:fragment (uri-fragment uri))
+ uri))
+
+ (let ((s (open-socket-for-uri plain-uri)))
+ (if https?
+ (let ((port
+ (tls-wrap s (uri-host uri)
+ #:verify-certificate? verify-certificate?)))
+ ;; Guile/guile-gnutls don't handle the handshake happening on a non
+ ;; blocking socket, so change the behavior here.
+ (non-blocking-port s)
+ port)
+ (non-blocking-port s))))
+
diff --git a/knots/parallelism.scm b/knots/parallelism.scm
new file mode 100644
index 0000000..b5b03d5
--- /dev/null
+++ b/knots/parallelism.scm
@@ -0,0 +1,197 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (knots parallelism)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 match)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
+ #:export (fibers-batch-map
+ fibers-map
+
+ fibers-map-with-progress
+
+ fibers-batch-for-each
+ fibers-for-each
+
+ fibers-parallel
+ fibers-let))
+
+;; Like split-at, but don't care about the order of the resulting lists, and
+;; don't error if the list is shorter than i elements
+(define (split-at* lst i)
+ (let lp ((l lst) (n i) (acc '()))
+ (if (or (<= n 0) (null? l))
+ (values (reverse! acc) l)
+ (lp (cdr l) (- n 1) (cons (car l) acc)))))
+
+;; As this can be called with lists with tens of thousands of items in them,
+;; batch the
+(define (get-batch batch-size lists)
+ (let ((split-lists
+ (map (lambda (lst)
+ (let ((batch rest (split-at* lst batch-size)))
+ (cons batch rest)))
+ lists)))
+ (values (map car split-lists)
+ (map cdr split-lists))))
+
+(define (defer-to-parallel-fiber thunk)
+ (let ((reply (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (put-message reply (cons 'exception exn)))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (with-throw-handler #t
+ thunk
+ (lambda _
+ (backtrace))))
+ (lambda vals
+ (put-message reply vals))))
+ #:unwind? #t))
+ #:parallel? #t)
+ reply))
+
+(define (fetch-result-of-defered-thunks . reply-channels)
+ (let ((responses (map get-message
+ reply-channels)))
+ (map
+ (match-lambda
+ (('exception . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))
+ responses)))
+
+(define (fibers-batch-map proc batch-size . lists)
+ (let loop ((lists lists)
+ (result '()))
+ (let ((batch
+ rest
+ (get-batch batch-size lists)))
+ (if (any null? batch)
+ result
+ (let ((response-channels
+ (apply map
+ (lambda args
+ (defer-to-parallel-fiber
+ (lambda ()
+ (apply proc args))))
+ batch)))
+ (loop rest
+ (append! result
+ (apply fetch-result-of-defered-thunks
+ response-channels))))))))
+
+(define (fibers-map proc . lists)
+ (apply fibers-batch-map proc 20 lists))
+
+(define (fibers-batch-for-each proc batch-size . lists)
+ (let loop ((lists lists))
+ (let ((batch
+ rest
+ (get-batch batch-size lists)))
+ (if (any null? batch)
+ *unspecified*
+ (let ((response-channels
+ (apply map
+ (lambda args
+ (defer-to-parallel-fiber
+ (lambda ()
+ (apply proc args))))
+ batch)))
+ (apply fetch-result-of-defered-thunks
+ response-channels)
+ (loop rest))))))
+
+(define (fibers-for-each proc . lists)
+ (apply fibers-batch-for-each proc 20 lists))
+
+(define-syntax fibers-parallel
+ (lambda (x)
+ (syntax-case x ()
+ ((_ e0 ...)
+ (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
+ #'(let ((tmp0 (defer-to-parallel-fiber
+ (lambda ()
+ e0)))
+ ...)
+ (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
+
+(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
+ (call-with-values
+ (lambda () (fibers-parallel e ...))
+ (lambda (v ...)
+ b0 b1 ...)))
+
+(define* (fibers-map-with-progress proc lists #:key report)
+ (let loop ((channels-to-results
+ (apply map
+ (lambda args
+ (cons (defer-to-parallel-fiber
+ (lambda ()
+ (apply proc args)))
+ #f))
+ lists)))
+ (let ((active-channels
+ (filter-map car channels-to-results)))
+ (when report
+ (report (apply map
+ list
+ (map cdr channels-to-results)
+ lists)))
+ (if (null? active-channels)
+ (map
+ (match-lambda
+ ((#f . ('exception . exn))
+ (raise-exception exn))
+ ((#f . ('result . val))
+ val))
+ channels-to-results)
+ (loop
+ (perform-operation
+ (apply
+ choice-operation
+ (filter-map
+ (lambda (p)
+ (match p
+ ((channel . _)
+ (if channel
+ (wrap-operation
+ (get-operation channel)
+ (lambda (result)
+ (map (match-lambda
+ ((c . r)
+ (if (eq? channel c)
+ (cons #f
+ (match result
+ (('exception . exn)
+ result)
+ (_
+ (cons 'result result))))
+ (cons c r))))
+ channels-to-results)))
+ #f))))
+ channels-to-results))))))))
diff --git a/knots/promise.scm b/knots/promise.scm
new file mode 100644
index 0000000..3eb0a81
--- /dev/null
+++ b/knots/promise.scm
@@ -0,0 +1,78 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (knots promise)
+ #:use-module (srfi srfi-9)
+ #:use-module (ice-9 atomic)
+ #:use-module (fibers)
+ #:use-module (fibers conditions)
+ #:export (fibers-delay
+ fibers-force
+ fibers-promise-reset))
+
+(define-record-type <fibers-promise>
+ (make-fibers-promise thunk values-box evaluated-condition)
+ fibers-promise?
+ (thunk fibers-promise-thunk)
+ (values-box fibers-promise-values-box)
+ (evaluated-condition fibers-promise-evaluated-condition))
+
+(define (fibers-delay thunk)
+ (make-fibers-promise
+ thunk
+ (make-atomic-box #f)
+ (make-condition)))
+
+(define (fibers-force fp)
+ (let ((res (atomic-box-compare-and-swap!
+ (fibers-promise-values-box fp)
+ #f
+ 'started)))
+ (if (eq? #f res)
+ (call-with-values
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (atomic-box-set! (fibers-promise-values-box fp)
+ exn)
+ (signal-condition!
+ (fibers-promise-evaluated-condition fp))
+ (raise-exception exn))
+ (fibers-promise-thunk fp)
+ #:unwind? #t))
+ (lambda vals
+ (atomic-box-set! (fibers-promise-values-box fp)
+ vals)
+ (signal-condition!
+ (fibers-promise-evaluated-condition fp))
+ (apply values vals)))
+ (if (eq? res 'started)
+ (begin
+ (wait (fibers-promise-evaluated-condition fp))
+ (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
+ (if (exception? result)
+ (raise-exception result)
+ (apply values result))))
+ (if (exception? res)
+ (raise-exception res)
+ (apply values res))))))
+
+(define (fibers-promise-reset fp)
+ (atomic-box-set! (fibers-promise-values-box fp)
+ #f))
diff --git a/knots/queue.scm b/knots/queue.scm
new file mode 100644
index 0000000..ec9f703
--- /dev/null
+++ b/knots/queue.scm
@@ -0,0 +1,47 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (knots queue)
+ #:use-module (ice-9 q)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
+ #:export (spawn-queueing-fiber))
+
+(define (spawn-queueing-fiber dest-channel)
+ (define queue (make-q))
+
+ (let ((queue-channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (if (q-empty? queue)
+ (enq! queue
+ (perform-operation
+ (get-operation queue-channel)))
+ (let ((front (q-front queue)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation (get-operation queue-channel)
+ (lambda (val)
+ (enq! queue val)))
+ (wrap-operation (put-operation dest-channel front)
+ (lambda _
+ (q-pop! queue))))))))))
+ queue-channel))
diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm
new file mode 100644
index 0000000..67e9292
--- /dev/null
+++ b/knots/resource-pool.scm
@@ -0,0 +1,485 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (knots resource-pool)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 exceptions)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers scheduler)
+ #:use-module (fibers operations)
+ #:export (resource-pool?
+
+ make-resource-pool
+ destroy-resource-pool
+
+ resource-pool-default-timeout
+ resource-pool-retry-checkout-timeout
+
+ &resource-pool-timeout
+ resource-pool-timeout-error?
+
+ resource-pool-default-timeout-handler
+
+ call-with-resource-from-pool
+ with-resource-from-pool
+
+ resource-pool-stats))
+
+(define-record-type <resource-pool>
+ (make-resource-pool-record name channel)
+ resource-pool?
+ (name resource-pool-name)
+ (channel resource-pool-channel))
+
+(define* (make-resource-pool initializer max-size
+ #:key (min-size max-size)
+ (idle-seconds #f)
+ (delay-logger (const #f))
+ (duration-logger (const #f))
+ destructor
+ lifetime
+ scheduler
+ (name "unnamed")
+ ;; Add options for customizing timeouts
+ )
+ (define (initializer/safe)
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running ~A resource pool initializer: ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running resource pool destructor (~A): ~A:\n ~A\n"
+ name
+ destructor
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 5)
+ (destructor/safe args)))))
+
+ (let ((channel (make-channel))
+ (checkout-failure-count 0))
+ (spawn-fiber
+ (lambda ()
+ (when idle-seconds
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (sleep idle-seconds)
+ (put-message channel '(check-for-idle-resources))))))
+
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception in the ~A pool fiber: ~A\n"
+ name
+ exn))
+ (lambda ()
+ (let loop ((resources '())
+ (available '())
+ (waiters '())
+ (resources-last-used '()))
+
+ (match (get-message channel)
+ (('checkout reply)
+ (if (null? available)
+ (if (= (length resources) max-size)
+ (loop resources
+ available
+ (cons reply waiters)
+ resources-last-used)
+ (let ((new-resource (initializer/safe)))
+ (if new-resource
+ (let ((checkout-success?
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation reply new-resource)
+ (const #t))
+ (wrap-operation (sleep-operation 1)
+ (const #f))))))
+ (unless checkout-success?
+ (set! checkout-failure-count
+ (+ 1 checkout-failure-count)))
+
+ (loop (cons new-resource resources)
+ (if checkout-success?
+ available
+ (cons new-resource available))
+ waiters
+ (cons (get-internal-real-time)
+ resources-last-used)))
+ (loop resources
+ available
+ (cons reply waiters)
+ resources-last-used))))
+ (let ((checkout-success?
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation reply (car available))
+ (const #t))
+ (wrap-operation (sleep-operation 1)
+ (const #f))))))
+ (unless checkout-success?
+ (set! checkout-failure-count
+ (+ 1 checkout-failure-count)))
+
+ (if checkout-success?
+ (loop resources
+ (cdr available)
+ waiters
+ resources-last-used)
+ (loop resources
+ available
+ waiters
+ resources-last-used)))))
+ (('return resource)
+ (if (null? waiters)
+ (loop resources
+ (cons resource available)
+ waiters
+ (begin
+ (list-set!
+ resources-last-used
+ (list-index (lambda (x)
+ (eq? x resource))
+ resources)
+ (get-internal-real-time))
+ resources-last-used))
+ (let ((checkout-success?
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation (last waiters)
+ resource)
+ (const #t))
+ (wrap-operation (sleep-operation 1)
+ (const #f))))))
+ (unless checkout-success?
+ (set! checkout-failure-count
+ (+ 1 checkout-failure-count)))
+
+ (if checkout-success?
+ (loop resources
+ available
+ (drop-right! waiters 1)
+ (begin
+ (list-set!
+ resources-last-used
+ (list-index (lambda (x)
+ (eq? x resource))
+ resources)
+ (get-internal-real-time))
+ resources-last-used))
+ (begin
+ (for-each
+ (lambda (waiter)
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (put-operation waiter 'resource-pool-retry-checkout)
+ (sleep-operation 10))))))
+ waiters)
+
+ (loop resources
+ (cons resource available)
+ '()
+ (begin
+ (list-set!
+ resources-last-used
+ (list-index (lambda (x)
+ (eq? x resource))
+ resources)
+ (get-internal-real-time))
+ resources-last-used)))))))
+ (('stats reply)
+ (let ((stats
+ `((resources . ,(length resources))
+ (available . ,(length available))
+ (waiters . ,(length waiters))
+ (checkout-failure-count . ,checkout-failure-count))))
+
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation reply stats)
+ (const #t))
+ (wrap-operation (sleep-operation 1)
+ (const #f)))))))
+
+ (loop resources
+ available
+ waiters
+ resources-last-used))
+ (('check-for-idle-resources)
+ (let* ((resources-last-used-seconds
+ (map
+ (lambda (internal-time)
+ (/ (- (get-internal-real-time) internal-time)
+ internal-time-units-per-second))
+ resources-last-used))
+ (resources-to-destroy
+ (filter-map
+ (lambda (resource last-used-seconds)
+ (if (and (member resource available)
+ (> last-used-seconds idle-seconds))
+ resource
+ #f))
+ resources
+ resources-last-used-seconds)))
+
+ (for-each
+ (lambda (resource)
+ (destructor/safe resource))
+ resources-to-destroy)
+
+ (loop (lset-difference eq? resources resources-to-destroy)
+ (lset-difference eq? available resources-to-destroy)
+ waiters
+ (filter-map
+ (lambda (resource last-used)
+ (if (memq resource resources-to-destroy)
+ #f
+ last-used))
+ resources
+ resources-last-used))))
+ (('destroy reply)
+ (if (= (length resources) (length available))
+ (begin
+ (for-each
+ (lambda (resource)
+ (destructor/safe resource))
+ resources)
+ (put-message reply 'destroy-success))
+ (begin
+ (spawn-fiber
+ (lambda ()
+ (perform-operation
+ (choice-operation
+ (put-operation reply 'resource-pool-destroy-failed)
+ (sleep-operation 10)))))
+ (loop resources
+ available
+ waiters
+ resources-last-used))))
+ (unknown
+ (simple-format
+ (current-error-port)
+ "unrecognised message to ~A resource pool channel: ~A\n"
+ name
+ unknown)
+ (loop resources
+ available
+ waiters
+ resources-last-used)))))
+ #:unwind? #t)))
+ (or scheduler
+ (current-scheduler)))
+
+ (make-resource-pool-record name channel)))
+
+(define (destroy-resource-pool pool)
+ (let ((reply (make-channel)))
+ (put-message (resource-pool-channel pool)
+ (list 'destroy reply))
+ (let ((msg (get-message reply)))
+ (unless (eq? msg 'destroy-success)
+ (error msg)))))
+
+(define resource-pool-default-timeout
+ (make-parameter #f))
+
+(define resource-pool-retry-checkout-timeout
+ (make-parameter 5))
+
+(define &resource-pool-timeout
+ (make-exception-type '&recource-pool-timeout
+ &error
+ '(name)))
+
+(define make-resource-pool-timeout-error
+ (record-constructor &resource-pool-timeout))
+
+(define resource-pool-timeout-error?
+ (record-predicate &resource-pool-timeout))
+
+(define resource-pool-default-timeout-handler
+ (make-parameter #f))
+
+(define* (call-with-resource-from-pool
+ pool proc #:key (timeout 'default)
+ (timeout-handler (resource-pool-default-timeout-handler)))
+ "Call PROC with a resource from POOL, blocking until a resource becomes
+available. Return the resource once PROC has returned."
+
+ (define retry-timeout
+ (resource-pool-retry-checkout-timeout))
+
+ (define timeout-or-default
+ (if (eq? timeout 'default)
+ (resource-pool-default-timeout)
+ timeout))
+
+ (let ((resource
+ (let ((reply (make-channel)))
+ (let loop ((start-time (get-internal-real-time)))
+ (let ((request-success?
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation (resource-pool-channel pool)
+ `(checkout ,reply))
+ (const #t))
+ (wrap-operation (sleep-operation (or timeout-or-default
+ retry-timeout))
+ (const #f))))))
+ (if request-success?
+ (let ((time-remaining
+ (- (or timeout-or-default
+ retry-timeout)
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second))))
+ (if (> time-remaining 0)
+ (let ((response
+ (perform-operation
+ (choice-operation
+ (get-operation reply)
+ (wrap-operation (sleep-operation time-remaining)
+ (const #f))))))
+ (if (or (not response)
+ (eq? response 'resource-pool-retry-checkout))
+ (if (> (- (or timeout-or-default
+ retry-timeout)
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second))
+ 0)
+ (loop start-time)
+ (if (eq? timeout-or-default #f)
+ (loop (get-internal-real-time))
+ #f))
+ response))
+ (if (eq? timeout-or-default #f)
+ (loop (get-internal-real-time))
+ #f)))
+ (if (eq? timeout-or-default #f)
+ (loop (get-internal-real-time))
+ #f)))))))
+
+ (when (or (not resource)
+ (eq? resource 'resource-pool-retry-checkout))
+ (when timeout-handler
+ (timeout-handler pool proc timeout))
+
+ (raise-exception
+ (make-resource-pool-timeout-error (resource-pool-name pool))))
+
+ (with-exception-handler
+ (lambda (exception)
+ (put-message (resource-pool-channel pool)
+ `(return ,resource))
+ (raise-exception exception))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (proc resource))
+ (lambda _
+ (backtrace))))
+ (lambda vals
+ (put-message (resource-pool-channel pool)
+ `(return ,resource))
+ (apply values vals))))
+ #:unwind? #t)))
+
+(define-syntax-rule (with-resource-from-pool pool resource exp ...)
+ (call-with-resource-from-pool
+ pool
+ (lambda (resource) exp ...)))
+
+(define* (resource-pool-stats pool #:key (timeout 5))
+ (let ((reply (make-channel))
+ (start-time (get-internal-real-time)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation
+ (put-operation (resource-pool-channel pool)
+ `(stats ,reply))
+ (const #t))
+ (wrap-operation (sleep-operation timeout)
+ (lambda _
+ (raise-exception
+ (make-resource-pool-timeout-error))))))
+
+ (let ((time-remaining
+ (- timeout
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second))))
+ (if (> time-remaining 0)
+ (perform-operation
+ (choice-operation
+ (get-operation reply)
+ (wrap-operation (sleep-operation time-remaining)
+ (lambda _
+ (raise-exception
+ (make-resource-pool-timeout-error))))))
+ (raise-exception
+ (make-resource-pool-timeout-error))))))
+
diff --git a/knots/timeout.scm b/knots/timeout.scm
new file mode 100644
index 0000000..8f8a2fb
--- /dev/null
+++ b/knots/timeout.scm
@@ -0,0 +1,200 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (knots timeout)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 exceptions)
+ #:use-module (ice-9 ports internal)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers scheduler)
+ #:use-module (fibers operations)
+ #:export (with-fibers-timeout
+
+ &port-timeout
+ port-timeout-error?
+
+ &port-read-timeout
+ port-read-timeout-error
+
+ &port-write-timeout
+ port-write-timeout-error?
+
+ with-port-timeouts))
+
+(define* (with-fibers-timeout thunk #:key timeout on-timeout)
+ (let ((channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (perform-operation
+ (choice-operation
+ (put-operation channel (cons 'exception exn))
+ (sleep-operation timeout))))
+ (lambda ()
+ (call-with-values thunk
+ (lambda vals
+ (perform-operation
+ (choice-operation
+ (put-operation channel vals)
+ (sleep-operation timeout))))))
+ #:unwind? #t)))
+
+ (match (perform-operation
+ (choice-operation
+ (get-operation channel)
+ (wrap-operation (sleep-operation timeout)
+ (const 'timeout))))
+ ('timeout
+ (on-timeout))
+ (('exception . exn)
+ (raise-exception exn))
+ (vals
+ (apply values vals)))))
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(thunk port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define (readable? port)
+ "Test if PORT is writable."
+ (= 1 (port-poll port "r" 0)))
+
+(define (writable? port)
+ "Test if PORT is writable."
+ (= 1 (port-poll port "w" 0)))
+
+(define (make-wait-operation ready? schedule-when-ready port
+ port-ready-fd this-procedure)
+ (make-base-operation #f
+ (lambda _
+ (and (ready? port) values))
+ (lambda (flag sched resume)
+ (define (commit)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (commit))
+ ('S #f)))
+ (schedule-when-ready
+ sched (port-ready-fd port) commit))))
+
+(define (wait-until-port-readable-operation port)
+ "Make an operation that will succeed when PORT is readable."
+ (unless (input-port? port)
+ (error "refusing to wait forever for input on non-input port"))
+ (make-wait-operation readable? schedule-task-when-fd-readable port
+ port-read-wait-fd
+ wait-until-port-readable-operation))
+
+(define (wait-until-port-writable-operation port)
+ "Make an operation that will succeed when PORT is writable."
+ (unless (output-port? port)
+ (error "refusing to wait forever for output on non-output port"))
+ (make-wait-operation writable? schedule-task-when-fd-writable port
+ port-write-wait-fd
+ wait-until-port-writable-operation))
+
+(define* (with-port-timeouts thunk
+ #:key timeout
+ (read-timeout timeout)
+ (write-timeout timeout))
+ (define (no-fibers-wait thunk port mode timeout)
+ (define poll-timeout-ms 200)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout
+ ;; remains unchanged! When the timeout is longer than the time
+ ;; between the syscall restarting, I think this renders the
+ ;; timeout useless. Therefore, this code uses a short timeout, and
+ ;; repeatedly calls poll while watching the clock to see if it has
+ ;; timed out overall.
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second timeout))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error thunk port)
+ (make-port-write-timeout-error thunk port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation
+ (sleep-operation read-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-read-timeout-error thunk port))))))
+ (no-fibers-wait thunk port "r" read-timeout))))
+ (current-write-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-writable-operation port)
+ (wrap-operation
+ (sleep-operation write-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-write-timeout-error thunk port))))))
+ (no-fibers-wait thunk port "w" write-timeout)))))
+ (thunk)))
diff --git a/knots/web-server.scm b/knots/web-server.scm
new file mode 100644
index 0000000..b94106f
--- /dev/null
+++ b/knots/web-server.scm
@@ -0,0 +1,263 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;; Copyright (C) 2010-2013,2015,2017 Free Software Foundation, Inc.
+
+;; This library is free software; you can redistribute it and/or
+;; modify it under the terms of the GNU Lesser General Public
+;; License as published by the Free Software Foundation; either
+;; version 3 of the License, or (at your option) any later version.
+;;
+;; This library is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public License
+;; along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (knots web-server)
+ #:use-module (srfi srfi-9)
+ #:use-module (srfi srfi-71)
+ #:use-module (fibers)
+ #:use-module (fibers conditions)
+ #:use-module (rnrs bytevectors)
+ #:use-module (ice-9 binary-ports)
+ #:use-module (ice-9 textual-ports)
+ #:use-module (ice-9 iconv)
+ #:use-module (ice-9 match)
+ #:use-module ((srfi srfi-9 gnu) #:select (set-field))
+ #:use-module (system repl error-handling)
+ #:use-module (web http)
+ #:use-module (web request)
+ #:use-module (web response)
+ #:use-module (knots non-blocking)
+ #:export (run-knots-web-server
+
+ web-server?
+ web-server-socket
+ web-server-port))
+
+(define (make-default-socket family addr port)
+ (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
+ (fcntl sock F_SETFD FD_CLOEXEC)
+ (bind sock family addr port)
+ sock))
+
+(define (extend-response r k v . additional)
+ (define (extend-alist alist k v)
+ (let ((pair (assq k alist)))
+ (acons k v (if pair (delq pair alist) alist))))
+ (let ((r (set-field r (response-headers)
+ (extend-alist (response-headers r) k v))))
+ (if (null? additional)
+ r
+ (apply extend-response r additional))))
+
+;; -> response body
+(define (sanitize-response request response body)
+ "\"Sanitize\" the given response and body, making them appropriate for
+the given request.
+
+As a convenience to web handler authors, RESPONSE may be given as
+an alist of headers, in which case it is used to construct a default
+response. Ensures that the response version corresponds to the request
+version. If BODY is a string, encodes the string to a bytevector,
+in an encoding appropriate for RESPONSE. Adds a
+‘content-length’ and ‘content-type’ header, as necessary.
+
+If BODY is a procedure, it is called with a port as an argument,
+and the output collected as a bytevector. In the future we might try to
+instead use a compressing, chunk-encoded port, and call this procedure
+later, in the write-client procedure. Authors are advised not to rely
+on the procedure being called at any particular time."
+ (cond
+ ((list? response)
+ (sanitize-response request
+ (build-response #:version (request-version request)
+ #:headers response)
+ body))
+ ((not (equal? (request-version request) (response-version response)))
+ (sanitize-response request
+ (adapt-response-version response
+ (request-version request))
+ body))
+ ((not body)
+ (values response #vu8()))
+ ((string? body)
+ (let* ((type (response-content-type response
+ '(text/plain)))
+ (declared-charset (assq-ref (cdr type) 'charset))
+ (charset (or declared-charset "utf-8")))
+ (sanitize-response
+ request
+ (if declared-charset
+ response
+ (extend-response response 'content-type
+ `(,@type (charset . ,charset))))
+ (string->bytevector body charset))))
+ ((not (or (bytevector? body)
+ (procedure? body)))
+ (error "unexpected body type"))
+ ((and (response-must-not-include-body? response)
+ body
+ ;; FIXME make this stricter: even an empty body should be prohibited.
+ (not (zero? (bytevector-length body))))
+ (error "response with this status code must not include body" response))
+ (else
+ ;; check length; assert type; add other required fields?
+ (values (if (procedure? body)
+ (if (response-content-length response)
+ response
+ (extend-response response
+ 'transfer-encoding
+ '((chunked))))
+ (let ((rlen (response-content-length response))
+ (blen (bytevector-length body)))
+ (cond
+ (rlen (if (= rlen blen)
+ response
+ (error "bad content-length" rlen blen)))
+ (else (extend-response response 'content-length blen)))))
+ (if (eq? (request-method request) 'HEAD)
+ ;; Responses to HEAD requests must not include bodies.
+ ;; We could raise an error here, but it seems more
+ ;; appropriate to just do something sensible.
+ #f
+ body)))))
+
+(define (with-stack-and-prompt thunk)
+ (call-with-prompt (default-prompt-tag)
+ (lambda () (start-stack #t (thunk)))
+ (lambda (k proc)
+ (with-stack-and-prompt (lambda () (proc k))))))
+
+(define (keep-alive? response)
+ (let ((v (response-version response)))
+ (and (or (< (response-code response) 400)
+ (= (response-code response) 404))
+ (case (car v)
+ ((1)
+ (case (cdr v)
+ ((1) (not (memq 'close (response-connection response))))
+ ((0) (memq 'keep-alive (response-connection response)))))
+ (else #f)))))
+
+(define (handle-request handler client)
+ (let ((request
+ (catch #t
+ (lambda ()
+ (read-request client))
+ (lambda (key . args)
+ (display "While reading request:\n" (current-error-port))
+ (print-exception (current-error-port) #f key args)
+ #f))))
+ (let ((response
+ body
+ (cond
+ ((not request)
+ ;; Bad request.
+ (values (build-response #:version '(1 . 0) #:code 400
+ #:headers '((content-length . 0)))
+ #vu8()))
+ (else
+ (call-with-error-handling
+ (lambda ()
+ (call-with-values (lambda ()
+ (with-stack-and-prompt
+ (lambda ()
+ (handler request))))
+ (lambda (response body)
+ (sanitize-response request response body))))
+ #:on-error 'backtrace
+ #:post-error (lambda _
+ (values (build-response #:code 500) #f)))))))
+ (write-response response client)
+ (when body
+ (if (procedure? body)
+ (if (response-content-length response)
+ (body client)
+ (let ((chunked-port
+ (make-chunked-output-port client
+ #:keep-alive? #t)))
+ (body chunked-port)
+ (close-port chunked-port)))
+ (put-bytevector client body)))
+ (force-output client)
+
+ (keep-alive? response))))
+
+(define (client-loop client handler)
+ ;; Always disable Nagle's algorithm, as we handle buffering
+ ;; ourselves; when we force-output, we really want the data to go
+ ;; out.
+ (setvbuf client 'block 1024)
+ (setsockopt client IPPROTO_TCP TCP_NODELAY 1)
+ (with-throw-handler #t
+ (lambda ()
+ (let loop ()
+ (cond
+ ((catch #t
+ (lambda () (eof-object? (lookahead-u8 client)))
+ (lambda _ #t))
+ (close-port client))
+ (else
+ (let ((keep-alive? (handle-request handler client)))
+ (if keep-alive?
+ (loop)
+ (close-port client)))))))
+ (lambda (k . args)
+ (close-port client))))
+
+(define-record-type <web-server>
+ (make-web-server socket port)
+ web-server?
+ (socket web-server-socket)
+ (port web-server-port))
+
+(define* (run-knots-web-server handler #:key
+ (host #f)
+ (family AF_INET)
+ (addr (if host
+ (inet-pton family host)
+ INADDR_LOOPBACK))
+ (port 8080)
+ (socket (make-default-socket family addr port)))
+ "Run the fibers web server.
+
+HANDLER should be a procedure that takes one argument, the HTTP
+request and returns two values, the response and response body.
+
+For example, here is a simple \"Hello, World!\" server:
+
+@example
+ (define (handler request)
+ (let ((body (read-request-body request)))
+ (values '((content-type . (text/plain)))
+ \"Hello, World!\")))
+ (run-server handler)
+@end example
+
+The response and body will be run through ‘sanitize-response’
+before sending back to the client."
+ (non-blocking-port socket)
+ ;; We use a large backlog by default. If the server is suddenly hit
+ ;; with a number of connections on a small backlog, clients won't
+ ;; receive confirmation for their SYN, leading them to retry --
+ ;; probably successfully, but with a large latency.
+ (listen socket 1024)
+ (sigaction SIGPIPE SIG_IGN)
+
+ (spawn-fiber
+ (lambda ()
+ (let loop ()
+ (match (accept socket (logior SOCK_NONBLOCK SOCK_CLOEXEC))
+ ((client . sockaddr)
+ (spawn-fiber (lambda ()
+ (client-loop client handler))
+ #:parallel? #t)
+ (loop))))))
+
+ (make-web-server socket
+ (vector-ref (getsockname socket)
+ 2))) ; Not sure what this structure is
diff --git a/knots/worker-threads.scm b/knots/worker-threads.scm
new file mode 100644
index 0000000..f4116e9
--- /dev/null
+++ b/knots/worker-threads.scm
@@ -0,0 +1,577 @@
+;;; Guile Knots
+;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This file is part of Guile Knots.
+;;;
+;;; The Guile Knots is free software; you can redistribute it and/or
+;;; modify it under the terms of the GNU General Public License as
+;;; published by the Free Software Foundation; either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; The Guile Knots is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with the guix-data-service. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (knots worker-threads)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19)
+ #:use-module (srfi srfi-71)
+ #:use-module (system foreign)
+ #:use-module (system base target)
+ #:use-module (rnrs bytevectors)
+ #:use-module (ice-9 q)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
+ #:export (set-thread-name
+ thread-name
+
+ make-worker-thread-channel
+ call-with-worker-thread
+
+ &worker-thread-timeout
+ worker-thread-timeout-error?
+
+ %worker-thread-default-timeout
+
+ create-work-queue))
+
+(define* (syscall->procedure return-type name argument-types
+ #:key library)
+ "Return a procedure that wraps the C function NAME using the dynamic FFI,
+and that returns two values: NAME's return value, and errno. When LIBRARY is
+specified, look up NAME in that library rather than in the global symbol name
+space.
+
+If an error occurs while creating the binding, defer the error report until
+the returned procedure is called."
+ (catch #t
+ (lambda ()
+ ;; Note: When #:library is set, try it first and fall back to libc
+ ;; proper. This is because libraries like libutil.so have been subsumed
+ ;; by libc.so with glibc >= 2.34.
+ (let ((ptr (dynamic-func name
+ (if library
+ (or (false-if-exception
+ (dynamic-link library))
+ (dynamic-link))
+ (dynamic-link)))))
+ ;; The #:return-errno? facility was introduced in Guile 2.0.12.
+ (pointer->procedure return-type ptr argument-types
+ #:return-errno? #t)))
+ (lambda args
+ (lambda _
+ (throw 'system-error name "~A" (list (strerror ENOSYS))
+ (list ENOSYS))))))
+
+(define %prctl
+ ;; Should it win the API contest against 'ioctl'? You tell us!
+ (syscall->procedure int "prctl"
+ (list int unsigned-long unsigned-long
+ unsigned-long unsigned-long)))
+
+(define PR_SET_NAME 15) ;<linux/prctl.h>
+(define PR_GET_NAME 16)
+(define PR_SET_CHILD_SUBREAPER 36)
+
+(define (set-child-subreaper!)
+ "Set the CHILD_SUBREAPER capability for the current process."
+ (%prctl PR_SET_CHILD_SUBREAPER 1 0 0 0))
+
+(define %max-thread-name-length
+ ;; Maximum length in bytes of the process name, including the terminating
+ ;; zero.
+ 16)
+
+(define (set-thread-name!/linux name)
+ "Set the name of the calling thread to NAME. NAME is truncated to 15
+bytes."
+ (let ((ptr (string->pointer name)))
+ (let ((ret
+ err
+ (%prctl PR_SET_NAME
+ (pointer-address ptr) 0 0 0)))
+ (unless (zero? ret)
+ (throw 'set-process-name "set-process-name"
+ "set-process-name: ~A"
+ (list (strerror err))
+ (list err))))))
+
+(define (bytes->string bytes)
+ "Read BYTES, a list of bytes, and return the null-terminated string decoded
+from there, or #f if that would be an empty string."
+ (match (take-while (negate zero?) bytes)
+ (()
+ #f)
+ (non-zero
+ (list->string (map integer->char non-zero)))))
+
+(define (thread-name/linux)
+ "Return the name of the calling thread as a string."
+ (let ((buf (make-bytevector %max-thread-name-length)))
+ (let ((ret
+ err
+ (%prctl PR_GET_NAME
+ (pointer-address (bytevector->pointer buf))
+ 0 0 0)))
+ (if (zero? ret)
+ (bytes->string (bytevector->u8-list buf))
+ (throw 'process-name "process-name"
+ "process-name: ~A"
+ (list (strerror err))
+ (list err))))))
+
+(define set-thread-name
+ (if (string-contains %host-type "linux")
+ set-thread-name!/linux
+ (const #f)))
+
+(define thread-name
+ (if (string-contains %host-type "linux")
+ thread-name/linux
+ (const "")))
+
+(define %worker-thread-args
+ (make-parameter #f))
+
+(define* (make-worker-thread-channel initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ (duration-logger (const #f))
+ destructor
+ lifetime
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
+ "Return a channel used to offload work to a dedicated thread. ARGS are the
+arguments of the worker thread procedure."
+ (define thread-proc-vector
+ (make-vector parallelism #f))
+
+ (define (initializer/safe)
+ (let ((args
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t)))
+
+ (if args
+ args
+ ;; never give up, just keep retrying
+ (begin
+ (sleep 1)
+ (initializer/safe)))))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ name
+ destructor
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 1)
+ (destructor/safe args)))))
+
+ (define (process thread-index channel args)
+ (let loop ((current-lifetime lifetime))
+ (let ((exception?
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
+ internal-time-units-per-second))
+
+ (let* ((start-time (get-internal-real-time))
+ (response
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (vector-set! thread-proc-vector
+ thread-index
+ proc)
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (start-stack
+ 'worker-thread
+ (apply proc args)))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t)))
+ (put-message reply
+ response)
+
+ (vector-set! thread-proc-vector
+ thread-index
+ #f)
+
+ (match response
+ (('worker-thread-error duration _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #t)
+ ((duration . _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #f))))))))
+ (unless (and expire-on-exception?
+ exception?)
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))))
+
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda (thread-index)
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append
+ name " w t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let init ((args (initializer/safe)))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "worker-thread-channel: exception: ~A\n" exn))
+ (lambda ()
+ (parameterize ((%worker-thread-args args))
+ (process thread-index channel args)))
+ #:unwind? #t)
+
+ (when destructor
+ (destructor/safe args))
+
+ (init (initializer/safe))))))
+ (iota parallelism))
+
+ (values channel
+ thread-proc-vector)))
+
+(define &worker-thread-timeout
+ (make-exception-type '&worker-thread-timeout
+ &error
+ '()))
+
+(define make-worker-thread-timeout-error
+ (record-constructor &worker-thread-timeout))
+
+(define worker-thread-timeout-error?
+ (record-predicate &worker-thread-timeout))
+
+(define %worker-thread-default-timeout
+ (make-parameter 30))
+
+(define* (call-with-worker-thread channel proc #:key duration-logger
+ (timeout (%worker-thread-default-timeout)))
+ "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
+If already in the worker thread, call PROC immediately."
+ (let ((args (%worker-thread-args)))
+ (if args
+ (apply proc args)
+ (let* ((reply (make-channel))
+ (operation-success?
+ (perform-operation
+ (let ((put
+ (wrap-operation
+ (put-operation channel
+ (list reply
+ (get-internal-real-time)
+ proc))
+ (const #t))))
+
+ (if timeout
+ (choice-operation
+ put
+ (wrap-operation (sleep-operation timeout)
+ (const #f)))
+ put)))))
+
+ (unless operation-success?
+ (raise-exception
+ (make-worker-thread-timeout-error)))
+
+ (match (get-message reply)
+ (('worker-thread-error duration exn)
+ (when duration-logger
+ (duration-logger duration))
+ (raise-exception exn))
+ ((duration . result)
+ (when duration-logger
+ (duration-logger duration))
+ (apply values result)))))))
+
+(define* (create-work-queue thread-count-parameter proc
+ #:key thread-start-delay
+ (thread-stop-delay
+ (make-time time-duration 0 0))
+ (name "unnamed")
+ priority<?)
+ (let ((queue (make-q))
+ (queue-mutex (make-mutex))
+ (job-available (make-condition-variable))
+ (running-job-args (make-hash-table)))
+
+ (define get-thread-count
+ (cond
+ ((number? thread-count-parameter)
+ (const thread-count-parameter))
+ ((eq? thread-count-parameter #f)
+ ;; Run one thread per job
+ (lambda ()
+ (+ (q-length queue)
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))))
+ (else
+ thread-count-parameter)))
+
+ (define process-job
+ (if priority<?
+ (lambda* (args #:key priority)
+ (with-mutex queue-mutex
+ (enq! queue (cons priority args))
+ (set-car!
+ queue
+ (stable-sort! (car queue)
+ (lambda (a b)
+ (priority<?
+ (car a)
+ (car b)))))
+ (sync-q! queue)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))
+ (lambda args
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))))
+
+ (define (count-threads)
+ (with-mutex queue-mutex
+ (hash-count (const #t) running-job-args)))
+
+ (define (count-jobs)
+ (with-mutex queue-mutex
+ (+ (q-length queue)
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))))
+
+ (define (list-jobs)
+ (with-mutex queue-mutex
+ (append (if priority<?
+ (map cdr (car queue))
+ (list-copy (car queue)))
+ (hash-fold (lambda (key val result)
+ (if val
+ (cons val result)
+ result))
+ '()
+ running-job-args))))
+
+ (define (thread-process-job job-args)
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "~A work queue, job raised exception ~A: ~A\n"
+ name job-args exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply proc job-args))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "~A work queue, exception when handling job: ~A ~A\n"
+ name key args)
+ (backtrace))))
+ #:unwind? #t))
+
+ (define (start-thread thread-index)
+ (define (too-many-threads?)
+ (let ((running-jobs-count
+ (hash-count (lambda (index val)
+ (list? val))
+ running-job-args))
+ (desired-thread-count (get-thread-count)))
+
+ (>= running-jobs-count
+ desired-thread-count)))
+
+ (define (thread-idle-for-too-long? last-job-finished-at)
+ (time>=?
+ (time-difference (current-time time-monotonic)
+ last-job-finished-at)
+ thread-stop-delay))
+
+ (define (stop-thread)
+ (hash-remove! running-job-args
+ thread-index)
+ (unlock-mutex queue-mutex))
+
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let loop ((last-job-finished-at (current-time time-monotonic)))
+ (lock-mutex queue-mutex)
+
+ (if (too-many-threads?)
+ (stop-thread)
+ (let ((job-args
+ (if (q-empty? queue)
+ ;; #f from wait-condition-variable indicates a timeout
+ (if (wait-condition-variable
+ job-available
+ queue-mutex
+ (+ 9 (time-second (current-time))))
+ ;; Another thread could have taken
+ ;; the job in the mean time
+ (if (q-empty? queue)
+ #f
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))
+ #f)
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))))
+
+ (if job-args
+ (begin
+ (hash-set! running-job-args
+ thread-index
+ job-args)
+
+ (unlock-mutex queue-mutex)
+ (thread-process-job job-args)
+
+ (with-mutex queue-mutex
+ (hash-set! running-job-args
+ thread-index
+ #f))
+
+ (loop (current-time time-monotonic)))
+ (if (thread-idle-for-too-long? last-job-finished-at)
+ (stop-thread)
+ (begin
+ (unlock-mutex queue-mutex)
+
+ (loop last-job-finished-at))))))))))
+
+
+ (define start-new-threads-if-necessary
+ (let ((previous-thread-started-at (make-time time-monotonic 0 0)))
+ (lambda (desired-count)
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
+
+ (if (procedure? thread-count-parameter)
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t")))
+ (const #t))
+
+ (while #t
+ (sleep 15)
+ (with-mutex queue-mutex
+ (let ((idle-threads (hash-count (lambda (index val)
+ (eq? #f val))
+ running-job-args)))
+ (when (= 0 idle-threads)
+ (start-new-threads-if-necessary (get-thread-count))))))))
+ (start-new-threads-if-necessary (get-thread-count)))
+
+ (values process-job count-jobs count-threads list-jobs)))
diff --git a/pre-inst-env.in b/pre-inst-env.in
new file mode 100644
index 0000000..ebf1a05
--- /dev/null
+++ b/pre-inst-env.in
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+abs_top_srcdir="`cd "@abs_top_srcdir@" > /dev/null; pwd`"
+abs_top_builddir="`cd "@abs_top_builddir@" > /dev/null; pwd`"
+
+GUILE_LOAD_COMPILED_PATH="$abs_top_builddir${GUILE_LOAD_COMPILED_PATH:+:}$GUILE_LOAD_COMPILED_PATH"
+GUILE_LOAD_PATH="$abs_top_builddir:$abs_top_srcdir${GUILE_LOAD_PATH:+:}:$GUILE_LOAD_PATH"
+export GUILE_LOAD_COMPILED_PATH GUILE_LOAD_PATH
+
+PATH="$abs_top_builddir:$PATH"
+export PATH
+
+exec "$@"
diff --git a/tests.scm b/tests.scm
new file mode 100644
index 0000000..6c07374
--- /dev/null
+++ b/tests.scm
@@ -0,0 +1,28 @@
+(define-module (tests)
+ #:use-module (ice-9 exceptions)
+ #:use-module (fibers)
+ #:export (run-fibers-for-tests
+ assert-no-heap-growth))
+
+(define (run-fibers-for-tests thunk)
+ (let ((result
+ (run-fibers
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ exn)
+ (lambda ()
+ (with-throw-handler #t
+ thunk
+ (lambda _
+ (backtrace)))
+ #t)
+ #:unwind? #t))
+ #:hz 0
+ #:parallelism 1)))
+ (if (exception? result)
+ (raise-exception result)
+ result)))
+
+(define (assert-no-heap-growth thunk)
+ (thunk))
diff --git a/tests/non-blocking.scm b/tests/non-blocking.scm
new file mode 100644
index 0000000..0f81f58
--- /dev/null
+++ b/tests/non-blocking.scm
@@ -0,0 +1,31 @@
+(use-modules (tests)
+ (fibers)
+ (unit-test)
+ (web uri)
+ (web client)
+ (web response)
+ (knots web-server)
+ (knots non-blocking))
+
+(run-fibers-for-tests
+ (lambda ()
+ (let* ((web-server
+ (run-knots-web-server
+ (lambda (request)
+ (values '((content-type . (text/plain)))
+ "Hello, World!"))
+ #:port 0)) ;; Bind to any port
+ (port
+ (web-server-port web-server))
+ (uri
+ (build-uri 'http #:host "127.0.0.1" #:port port)))
+
+
+ (assert-equal
+ 200
+ (response-code
+ (http-get
+ uri
+ #:port (nonblocking-open-socket-for-uri uri)))))))
+
+(display "non-blocking test finished successfully\n")
diff --git a/tests/parallelism.scm b/tests/parallelism.scm
new file mode 100644
index 0000000..8249d68
--- /dev/null
+++ b/tests/parallelism.scm
@@ -0,0 +1,15 @@
+(use-modules (tests)
+ (fibers)
+ (unit-test)
+ (knots parallelism))
+
+(run-fibers-for-tests
+ (lambda ()
+ (assert-equal
+ 1122
+ (apply + (fibers-map
+ (lambda (i)
+ (* 2 i))
+ (iota 34))))))
+
+(display "parallelism test finished successfully\n")
diff --git a/tests/promise.scm b/tests/promise.scm
new file mode 100644
index 0000000..b7dec73
--- /dev/null
+++ b/tests/promise.scm
@@ -0,0 +1,20 @@
+(use-modules (tests)
+ (fibers)
+ (unit-test)
+ (knots parallelism)
+ (knots promise))
+
+(run-fibers-for-tests
+ (lambda ()
+ (let ((promises
+ (map (lambda (i)
+ (fibers-delay
+ (lambda ()
+ (* i 2))))
+ (iota 10))))
+
+ (assert-equal
+ 90
+ (apply + (fibers-map fibers-force promises))))))
+
+(display "promise test finished successfully\n")
diff --git a/tests/queue.scm b/tests/queue.scm
new file mode 100644
index 0000000..c80e5fd
--- /dev/null
+++ b/tests/queue.scm
@@ -0,0 +1,22 @@
+(use-modules (tests)
+ (fibers)
+ (fibers channels)
+ (unit-test)
+ (knots queue))
+
+(run-fibers-for-tests
+ (lambda ()
+ (let* ((dest-channel
+ (make-channel))
+ (queue-channel
+ (spawn-queueing-fiber dest-channel)))
+
+ (put-message queue-channel 1)
+ (put-message queue-channel 2)
+ (put-message queue-channel 3)
+
+ (assert-equal 1 (get-message dest-channel))
+ (assert-equal 2 (get-message dest-channel))
+ (assert-equal 3 (get-message dest-channel)))))
+
+(display "queue test finished successfully\n")
diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm
new file mode 100644
index 0000000..ebd4682
--- /dev/null
+++ b/tests/resource-pool.scm
@@ -0,0 +1,18 @@
+(use-modules (tests)
+ (fibers)
+ (unit-test)
+ (knots resource-pool))
+
+(run-fibers-for-tests
+ (lambda ()
+ (let ((resource-pool (make-resource-pool
+ (lambda ()
+ 2)
+ 1)))
+ (assert-equal
+ (with-resource-from-pool resource-pool
+ res
+ res)
+ 2))))
+
+(display "resource-pool test finished successfully\n")
diff --git a/tests/timeout.scm b/tests/timeout.scm
new file mode 100644
index 0000000..bab39d2
--- /dev/null
+++ b/tests/timeout.scm
@@ -0,0 +1,22 @@
+(use-modules (tests)
+ (fibers)
+ (unit-test)
+ (knots timeout))
+
+(run-fibers-for-tests
+ (lambda ()
+ (assert-equal
+ 1
+ (with-fibers-timeout
+ (const 1)
+ #:timeout 10))
+
+ (assert-equal
+ 2
+ (with-fibers-timeout
+ (lambda ()
+ (sleep 10))
+ #:timeout 0.1
+ #:on-timeout (const 2)))))
+
+(display "timeout test finished successfully\n")
diff --git a/tests/web-server.scm b/tests/web-server.scm
new file mode 100644
index 0000000..5a76d0f
--- /dev/null
+++ b/tests/web-server.scm
@@ -0,0 +1,30 @@
+(use-modules (tests)
+ (fibers)
+ (unit-test)
+ (web uri)
+ (web client)
+ (web response)
+ (knots web-server)
+ (knots non-blocking))
+
+(run-fibers-for-tests
+ (lambda ()
+ (let* ((web-server
+ (run-knots-web-server
+ (lambda (request)
+ (values '((content-type . (text/plain)))
+ "Hello, World!"))
+ #:port 0)) ;; Bind to any port
+ (port
+ (web-server-port web-server))
+ (uri
+ (build-uri 'http #:host "127.0.0.1" #:port port)))
+
+ (assert-equal
+ 200
+ (response-code
+ (http-get
+ uri
+ #:port (nonblocking-open-socket-for-uri uri)))))))
+
+(display "web-server test finished successfully\n")
diff --git a/tests/worker-threads.scm b/tests/worker-threads.scm
new file mode 100644
index 0000000..72a56dc
--- /dev/null
+++ b/tests/worker-threads.scm
@@ -0,0 +1,32 @@
+(use-modules (tests)
+ (srfi srfi-71)
+ (fibers)
+ (unit-test)
+ (knots worker-threads))
+
+(let ((worker-thread-channel
+ (make-worker-thread-channel
+ (const '())
+ #:parallelism 2)))
+
+ (run-fibers-for-tests
+ (lambda ()
+ (assert-equal
+ (call-with-worker-thread
+ worker-thread-channel
+ (lambda ()
+ 4))
+ 4))))
+
+(let ((process-job
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue
+ 2
+ (lambda (i)
+ (* i 2)))))
+
+ (process-job 3))
+
+(display "worker-threads test finished successfully\n")