aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.envrc2
-rw-r--r--COPYING661
-rw-r--r--Makefile.am1
-rw-r--r--guile.am2
-rw-r--r--guix-dev.scm2
-rw-r--r--nar-herder/cached-compression.scm753
-rw-r--r--nar-herder/database.scm1006
-rw-r--r--nar-herder/mirror.scm96
-rw-r--r--nar-herder/recent-changes.scm156
-rw-r--r--nar-herder/server.scm784
-rw-r--r--nar-herder/storage.scm719
-rw-r--r--nar-herder/utils.scm837
-rw-r--r--scripts/nar-herder.in476
13 files changed, 4498 insertions, 997 deletions
diff --git a/.envrc b/.envrc
index 4e544a3..def7dec 100644
--- a/.envrc
+++ b/.envrc
@@ -2,7 +2,7 @@
export GUILE_LOAD_PATH=""
export GUILE_LOAD_COMPILED_PATH=""
-use guix --no-grafts -l guix-dev.scm
+use guix --no-grafts -D -f guix-dev.scm
export GUILE_LOAD_COMPILED_PATH="$PWD:$PWD/tests:$GUILE_LOAD_COMPILED_PATH"
export GUILE_LOAD_PATH="$PWD:$GUILE_LOAD_PATH"
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..dba13ed
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,661 @@
+ GNU AFFERO GENERAL PUBLIC LICENSE
+ Version 3, 19 November 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The GNU Affero General Public License is a free, copyleft license for
+software and other kinds of works, specifically designed to ensure
+cooperation with the community in the case of network server software.
+
+ The licenses for most software and other practical works are designed
+to take away your freedom to share and change the works. By contrast,
+our General Public Licenses are intended to guarantee your freedom to
+share and change all versions of a program--to make sure it remains free
+software for all its users.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+them if you wish), that you receive source code or can get it if you
+want it, that you can change the software or use pieces of it in new
+free programs, and that you know you can do these things.
+
+ Developers that use our General Public Licenses protect your rights
+with two steps: (1) assert copyright on the software, and (2) offer
+you this License which gives you legal permission to copy, distribute
+and/or modify the software.
+
+ A secondary benefit of defending all users' freedom is that
+improvements made in alternate versions of the program, if they
+receive widespread use, become available for other developers to
+incorporate. Many developers of free software are heartened and
+encouraged by the resulting cooperation. However, in the case of
+software used on network servers, this result may fail to come about.
+The GNU General Public License permits making a modified version and
+letting the public access it on a server without ever releasing its
+source code to the public.
+
+ The GNU Affero General Public License is designed specifically to
+ensure that, in such cases, the modified source code becomes available
+to the community. It requires the operator of a network server to
+provide the source code of the modified version running there to the
+users of that server. Therefore, public use of a modified version, on
+a publicly accessible server, gives the public access to the source
+code of the modified version.
+
+ An older license, called the Affero General Public License and
+published by Affero, was designed to accomplish similar goals. This is
+a different license, not a version of the Affero GPL, but Affero has
+released a new version of the Affero GPL which permits relicensing under
+this license.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ TERMS AND CONDITIONS
+
+ 0. Definitions.
+
+ "This License" refers to version 3 of the GNU Affero General Public License.
+
+ "Copyright" also means copyright-like laws that apply to other kinds of
+works, such as semiconductor masks.
+
+ "The Program" refers to any copyrightable work licensed under this
+License. Each licensee is addressed as "you". "Licensees" and
+"recipients" may be individuals or organizations.
+
+ To "modify" a work means to copy from or adapt all or part of the work
+in a fashion requiring copyright permission, other than the making of an
+exact copy. The resulting work is called a "modified version" of the
+earlier work or a work "based on" the earlier work.
+
+ A "covered work" means either the unmodified Program or a work based
+on the Program.
+
+ To "propagate" a work means to do anything with it that, without
+permission, would make you directly or secondarily liable for
+infringement under applicable copyright law, except executing it on a
+computer or modifying a private copy. Propagation includes copying,
+distribution (with or without modification), making available to the
+public, and in some countries other activities as well.
+
+ To "convey" a work means any kind of propagation that enables other
+parties to make or receive copies. Mere interaction with a user through
+a computer network, with no transfer of a copy, is not conveying.
+
+ An interactive user interface displays "Appropriate Legal Notices"
+to the extent that it includes a convenient and prominently visible
+feature that (1) displays an appropriate copyright notice, and (2)
+tells the user that there is no warranty for the work (except to the
+extent that warranties are provided), that licensees may convey the
+work under this License, and how to view a copy of this License. If
+the interface presents a list of user commands or options, such as a
+menu, a prominent item in the list meets this criterion.
+
+ 1. Source Code.
+
+ The "source code" for a work means the preferred form of the work
+for making modifications to it. "Object code" means any non-source
+form of a work.
+
+ A "Standard Interface" means an interface that either is an official
+standard defined by a recognized standards body, or, in the case of
+interfaces specified for a particular programming language, one that
+is widely used among developers working in that language.
+
+ The "System Libraries" of an executable work include anything, other
+than the work as a whole, that (a) is included in the normal form of
+packaging a Major Component, but which is not part of that Major
+Component, and (b) serves only to enable use of the work with that
+Major Component, or to implement a Standard Interface for which an
+implementation is available to the public in source code form. A
+"Major Component", in this context, means a major essential component
+(kernel, window system, and so on) of the specific operating system
+(if any) on which the executable work runs, or a compiler used to
+produce the work, or an object code interpreter used to run it.
+
+ The "Corresponding Source" for a work in object code form means all
+the source code needed to generate, install, and (for an executable
+work) run the object code and to modify the work, including scripts to
+control those activities. However, it does not include the work's
+System Libraries, or general-purpose tools or generally available free
+programs which are used unmodified in performing those activities but
+which are not part of the work. For example, Corresponding Source
+includes interface definition files associated with source files for
+the work, and the source code for shared libraries and dynamically
+linked subprograms that the work is specifically designed to require,
+such as by intimate data communication or control flow between those
+subprograms and other parts of the work.
+
+ The Corresponding Source need not include anything that users
+can regenerate automatically from other parts of the Corresponding
+Source.
+
+ The Corresponding Source for a work in source code form is that
+same work.
+
+ 2. Basic Permissions.
+
+ All rights granted under this License are granted for the term of
+copyright on the Program, and are irrevocable provided the stated
+conditions are met. This License explicitly affirms your unlimited
+permission to run the unmodified Program. The output from running a
+covered work is covered by this License only if the output, given its
+content, constitutes a covered work. This License acknowledges your
+rights of fair use or other equivalent, as provided by copyright law.
+
+ You may make, run and propagate covered works that you do not
+convey, without conditions so long as your license otherwise remains
+in force. You may convey covered works to others for the sole purpose
+of having them make modifications exclusively for you, or provide you
+with facilities for running those works, provided that you comply with
+the terms of this License in conveying all material for which you do
+not control copyright. Those thus making or running the covered works
+for you must do so exclusively on your behalf, under your direction
+and control, on terms that prohibit them from making any copies of
+your copyrighted material outside their relationship with you.
+
+ Conveying under any other circumstances is permitted solely under
+the conditions stated below. Sublicensing is not allowed; section 10
+makes it unnecessary.
+
+ 3. Protecting Users' Legal Rights From Anti-Circumvention Law.
+
+ No covered work shall be deemed part of an effective technological
+measure under any applicable law fulfilling obligations under article
+11 of the WIPO copyright treaty adopted on 20 December 1996, or
+similar laws prohibiting or restricting circumvention of such
+measures.
+
+ When you convey a covered work, you waive any legal power to forbid
+circumvention of technological measures to the extent such circumvention
+is effected by exercising rights under this License with respect to
+the covered work, and you disclaim any intention to limit operation or
+modification of the work as a means of enforcing, against the work's
+users, your or third parties' legal rights to forbid circumvention of
+technological measures.
+
+ 4. Conveying Verbatim Copies.
+
+ You may convey verbatim copies of the Program's source code as you
+receive it, in any medium, provided that you conspicuously and
+appropriately publish on each copy an appropriate copyright notice;
+keep intact all notices stating that this License and any
+non-permissive terms added in accord with section 7 apply to the code;
+keep intact all notices of the absence of any warranty; and give all
+recipients a copy of this License along with the Program.
+
+ You may charge any price or no price for each copy that you convey,
+and you may offer support or warranty protection for a fee.
+
+ 5. Conveying Modified Source Versions.
+
+ You may convey a work based on the Program, or the modifications to
+produce it from the Program, in the form of source code under the
+terms of section 4, provided that you also meet all of these conditions:
+
+ a) The work must carry prominent notices stating that you modified
+ it, and giving a relevant date.
+
+ b) The work must carry prominent notices stating that it is
+ released under this License and any conditions added under section
+ 7. This requirement modifies the requirement in section 4 to
+ "keep intact all notices".
+
+ c) You must license the entire work, as a whole, under this
+ License to anyone who comes into possession of a copy. This
+ License will therefore apply, along with any applicable section 7
+ additional terms, to the whole of the work, and all its parts,
+ regardless of how they are packaged. This License gives no
+ permission to license the work in any other way, but it does not
+ invalidate such permission if you have separately received it.
+
+ d) If the work has interactive user interfaces, each must display
+ Appropriate Legal Notices; however, if the Program has interactive
+ interfaces that do not display Appropriate Legal Notices, your
+ work need not make them do so.
+
+ A compilation of a covered work with other separate and independent
+works, which are not by their nature extensions of the covered work,
+and which are not combined with it such as to form a larger program,
+in or on a volume of a storage or distribution medium, is called an
+"aggregate" if the compilation and its resulting copyright are not
+used to limit the access or legal rights of the compilation's users
+beyond what the individual works permit. Inclusion of a covered work
+in an aggregate does not cause this License to apply to the other
+parts of the aggregate.
+
+ 6. Conveying Non-Source Forms.
+
+ You may convey a covered work in object code form under the terms
+of sections 4 and 5, provided that you also convey the
+machine-readable Corresponding Source under the terms of this License,
+in one of these ways:
+
+ a) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by the
+ Corresponding Source fixed on a durable physical medium
+ customarily used for software interchange.
+
+ b) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by a
+ written offer, valid for at least three years and valid for as
+ long as you offer spare parts or customer support for that product
+ model, to give anyone who possesses the object code either (1) a
+ copy of the Corresponding Source for all the software in the
+ product that is covered by this License, on a durable physical
+ medium customarily used for software interchange, for a price no
+ more than your reasonable cost of physically performing this
+ conveying of source, or (2) access to copy the
+ Corresponding Source from a network server at no charge.
+
+ c) Convey individual copies of the object code with a copy of the
+ written offer to provide the Corresponding Source. This
+ alternative is allowed only occasionally and noncommercially, and
+ only if you received the object code with such an offer, in accord
+ with subsection 6b.
+
+ d) Convey the object code by offering access from a designated
+ place (gratis or for a charge), and offer equivalent access to the
+ Corresponding Source in the same way through the same place at no
+ further charge. You need not require recipients to copy the
+ Corresponding Source along with the object code. If the place to
+ copy the object code is a network server, the Corresponding Source
+ may be on a different server (operated by you or a third party)
+ that supports equivalent copying facilities, provided you maintain
+ clear directions next to the object code saying where to find the
+ Corresponding Source. Regardless of what server hosts the
+ Corresponding Source, you remain obligated to ensure that it is
+ available for as long as needed to satisfy these requirements.
+
+ e) Convey the object code using peer-to-peer transmission, provided
+ you inform other peers where the object code and Corresponding
+ Source of the work are being offered to the general public at no
+ charge under subsection 6d.
+
+ A separable portion of the object code, whose source code is excluded
+from the Corresponding Source as a System Library, need not be
+included in conveying the object code work.
+
+ A "User Product" is either (1) a "consumer product", which means any
+tangible personal property which is normally used for personal, family,
+or household purposes, or (2) anything designed or sold for incorporation
+into a dwelling. In determining whether a product is a consumer product,
+doubtful cases shall be resolved in favor of coverage. For a particular
+product received by a particular user, "normally used" refers to a
+typical or common use of that class of product, regardless of the status
+of the particular user or of the way in which the particular user
+actually uses, or expects or is expected to use, the product. A product
+is a consumer product regardless of whether the product has substantial
+commercial, industrial or non-consumer uses, unless such uses represent
+the only significant mode of use of the product.
+
+ "Installation Information" for a User Product means any methods,
+procedures, authorization keys, or other information required to install
+and execute modified versions of a covered work in that User Product from
+a modified version of its Corresponding Source. The information must
+suffice to ensure that the continued functioning of the modified object
+code is in no case prevented or interfered with solely because
+modification has been made.
+
+ If you convey an object code work under this section in, or with, or
+specifically for use in, a User Product, and the conveying occurs as
+part of a transaction in which the right of possession and use of the
+User Product is transferred to the recipient in perpetuity or for a
+fixed term (regardless of how the transaction is characterized), the
+Corresponding Source conveyed under this section must be accompanied
+by the Installation Information. But this requirement does not apply
+if neither you nor any third party retains the ability to install
+modified object code on the User Product (for example, the work has
+been installed in ROM).
+
+ The requirement to provide Installation Information does not include a
+requirement to continue to provide support service, warranty, or updates
+for a work that has been modified or installed by the recipient, or for
+the User Product in which it has been modified or installed. Access to a
+network may be denied when the modification itself materially and
+adversely affects the operation of the network or violates the rules and
+protocols for communication across the network.
+
+ Corresponding Source conveyed, and Installation Information provided,
+in accord with this section must be in a format that is publicly
+documented (and with an implementation available to the public in
+source code form), and must require no special password or key for
+unpacking, reading or copying.
+
+ 7. Additional Terms.
+
+ "Additional permissions" are terms that supplement the terms of this
+License by making exceptions from one or more of its conditions.
+Additional permissions that are applicable to the entire Program shall
+be treated as though they were included in this License, to the extent
+that they are valid under applicable law. If additional permissions
+apply only to part of the Program, that part may be used separately
+under those permissions, but the entire Program remains governed by
+this License without regard to the additional permissions.
+
+ When you convey a copy of a covered work, you may at your option
+remove any additional permissions from that copy, or from any part of
+it. (Additional permissions may be written to require their own
+removal in certain cases when you modify the work.) You may place
+additional permissions on material, added by you to a covered work,
+for which you have or can give appropriate copyright permission.
+
+ Notwithstanding any other provision of this License, for material you
+add to a covered work, you may (if authorized by the copyright holders of
+that material) supplement the terms of this License with terms:
+
+ a) Disclaiming warranty or limiting liability differently from the
+ terms of sections 15 and 16 of this License; or
+
+ b) Requiring preservation of specified reasonable legal notices or
+ author attributions in that material or in the Appropriate Legal
+ Notices displayed by works containing it; or
+
+ c) Prohibiting misrepresentation of the origin of that material, or
+ requiring that modified versions of such material be marked in
+ reasonable ways as different from the original version; or
+
+ d) Limiting the use for publicity purposes of names of licensors or
+ authors of the material; or
+
+ e) Declining to grant rights under trademark law for use of some
+ trade names, trademarks, or service marks; or
+
+ f) Requiring indemnification of licensors and authors of that
+ material by anyone who conveys the material (or modified versions of
+ it) with contractual assumptions of liability to the recipient, for
+ any liability that these contractual assumptions directly impose on
+ those licensors and authors.
+
+ All other non-permissive additional terms are considered "further
+restrictions" within the meaning of section 10. If the Program as you
+received it, or any part of it, contains a notice stating that it is
+governed by this License along with a term that is a further
+restriction, you may remove that term. If a license document contains
+a further restriction but permits relicensing or conveying under this
+License, you may add to a covered work material governed by the terms
+of that license document, provided that the further restriction does
+not survive such relicensing or conveying.
+
+ If you add terms to a covered work in accord with this section, you
+must place, in the relevant source files, a statement of the
+additional terms that apply to those files, or a notice indicating
+where to find the applicable terms.
+
+ Additional terms, permissive or non-permissive, may be stated in the
+form of a separately written license, or stated as exceptions;
+the above requirements apply either way.
+
+ 8. Termination.
+
+ You may not propagate or modify a covered work except as expressly
+provided under this License. Any attempt otherwise to propagate or
+modify it is void, and will automatically terminate your rights under
+this License (including any patent licenses granted under the third
+paragraph of section 11).
+
+ However, if you cease all violation of this License, then your
+license from a particular copyright holder is reinstated (a)
+provisionally, unless and until the copyright holder explicitly and
+finally terminates your license, and (b) permanently, if the copyright
+holder fails to notify you of the violation by some reasonable means
+prior to 60 days after the cessation.
+
+ Moreover, your license from a particular copyright holder is
+reinstated permanently if the copyright holder notifies you of the
+violation by some reasonable means, this is the first time you have
+received notice of violation of this License (for any work) from that
+copyright holder, and you cure the violation prior to 30 days after
+your receipt of the notice.
+
+ Termination of your rights under this section does not terminate the
+licenses of parties who have received copies or rights from you under
+this License. If your rights have been terminated and not permanently
+reinstated, you do not qualify to receive new licenses for the same
+material under section 10.
+
+ 9. Acceptance Not Required for Having Copies.
+
+ You are not required to accept this License in order to receive or
+run a copy of the Program. Ancillary propagation of a covered work
+occurring solely as a consequence of using peer-to-peer transmission
+to receive a copy likewise does not require acceptance. However,
+nothing other than this License grants you permission to propagate or
+modify any covered work. These actions infringe copyright if you do
+not accept this License. Therefore, by modifying or propagating a
+covered work, you indicate your acceptance of this License to do so.
+
+ 10. Automatic Licensing of Downstream Recipients.
+
+ Each time you convey a covered work, the recipient automatically
+receives a license from the original licensors, to run, modify and
+propagate that work, subject to this License. You are not responsible
+for enforcing compliance by third parties with this License.
+
+ An "entity transaction" is a transaction transferring control of an
+organization, or substantially all assets of one, or subdividing an
+organization, or merging organizations. If propagation of a covered
+work results from an entity transaction, each party to that
+transaction who receives a copy of the work also receives whatever
+licenses to the work the party's predecessor in interest had or could
+give under the previous paragraph, plus a right to possession of the
+Corresponding Source of the work from the predecessor in interest, if
+the predecessor has it or can get it with reasonable efforts.
+
+ You may not impose any further restrictions on the exercise of the
+rights granted or affirmed under this License. For example, you may
+not impose a license fee, royalty, or other charge for exercise of
+rights granted under this License, and you may not initiate litigation
+(including a cross-claim or counterclaim in a lawsuit) alleging that
+any patent claim is infringed by making, using, selling, offering for
+sale, or importing the Program or any portion of it.
+
+ 11. Patents.
+
+ A "contributor" is a copyright holder who authorizes use under this
+License of the Program or a work on which the Program is based. The
+work thus licensed is called the contributor's "contributor version".
+
+ A contributor's "essential patent claims" are all patent claims
+owned or controlled by the contributor, whether already acquired or
+hereafter acquired, that would be infringed by some manner, permitted
+by this License, of making, using, or selling its contributor version,
+but do not include claims that would be infringed only as a
+consequence of further modification of the contributor version. For
+purposes of this definition, "control" includes the right to grant
+patent sublicenses in a manner consistent with the requirements of
+this License.
+
+ Each contributor grants you a non-exclusive, worldwide, royalty-free
+patent license under the contributor's essential patent claims, to
+make, use, sell, offer for sale, import and otherwise run, modify and
+propagate the contents of its contributor version.
+
+ In the following three paragraphs, a "patent license" is any express
+agreement or commitment, however denominated, not to enforce a patent
+(such as an express permission to practice a patent or covenant not to
+sue for patent infringement). To "grant" such a patent license to a
+party means to make such an agreement or commitment not to enforce a
+patent against the party.
+
+ If you convey a covered work, knowingly relying on a patent license,
+and the Corresponding Source of the work is not available for anyone
+to copy, free of charge and under the terms of this License, through a
+publicly available network server or other readily accessible means,
+then you must either (1) cause the Corresponding Source to be so
+available, or (2) arrange to deprive yourself of the benefit of the
+patent license for this particular work, or (3) arrange, in a manner
+consistent with the requirements of this License, to extend the patent
+license to downstream recipients. "Knowingly relying" means you have
+actual knowledge that, but for the patent license, your conveying the
+covered work in a country, or your recipient's use of the covered work
+in a country, would infringe one or more identifiable patents in that
+country that you have reason to believe are valid.
+
+ If, pursuant to or in connection with a single transaction or
+arrangement, you convey, or propagate by procuring conveyance of, a
+covered work, and grant a patent license to some of the parties
+receiving the covered work authorizing them to use, propagate, modify
+or convey a specific copy of the covered work, then the patent license
+you grant is automatically extended to all recipients of the covered
+work and works based on it.
+
+ A patent license is "discriminatory" if it does not include within
+the scope of its coverage, prohibits the exercise of, or is
+conditioned on the non-exercise of one or more of the rights that are
+specifically granted under this License. You may not convey a covered
+work if you are a party to an arrangement with a third party that is
+in the business of distributing software, under which you make payment
+to the third party based on the extent of your activity of conveying
+the work, and under which the third party grants, to any of the
+parties who would receive the covered work from you, a discriminatory
+patent license (a) in connection with copies of the covered work
+conveyed by you (or copies made from those copies), or (b) primarily
+for and in connection with specific products or compilations that
+contain the covered work, unless you entered into that arrangement,
+or that patent license was granted, prior to 28 March 2007.
+
+ Nothing in this License shall be construed as excluding or limiting
+any implied license or other defenses to infringement that may
+otherwise be available to you under applicable patent law.
+
+ 12. No Surrender of Others' Freedom.
+
+ If conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot convey a
+covered work so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you may
+not convey it at all. For example, if you agree to terms that obligate you
+to collect a royalty for further conveying from those to whom you convey
+the Program, the only way you could satisfy both those terms and this
+License would be to refrain entirely from conveying the Program.
+
+ 13. Remote Network Interaction; Use with the GNU General Public License.
+
+ Notwithstanding any other provision of this License, if you modify the
+Program, your modified version must prominently offer all users
+interacting with it remotely through a computer network (if your version
+supports such interaction) an opportunity to receive the Corresponding
+Source of your version by providing access to the Corresponding Source
+from a network server at no charge, through some standard or customary
+means of facilitating copying of software. This Corresponding Source
+shall include the Corresponding Source for any work covered by version 3
+of the GNU General Public License that is incorporated pursuant to the
+following paragraph.
+
+ Notwithstanding any other provision of this License, you have
+permission to link or combine any covered work with a work licensed
+under version 3 of the GNU General Public License into a single
+combined work, and to convey the resulting work. The terms of this
+License will continue to apply to the part which is the covered work,
+but the work with which it is combined will remain governed by version
+3 of the GNU General Public License.
+
+ 14. Revised Versions of this License.
+
+ The Free Software Foundation may publish revised and/or new versions of
+the GNU Affero General Public License from time to time. Such new versions
+will be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+ Each version is given a distinguishing version number. If the
+Program specifies that a certain numbered version of the GNU Affero General
+Public License "or any later version" applies to it, you have the
+option of following the terms and conditions either of that numbered
+version or of any later version published by the Free Software
+Foundation. If the Program does not specify a version number of the
+GNU Affero General Public License, you may choose any version ever published
+by the Free Software Foundation.
+
+ If the Program specifies that a proxy can decide which future
+versions of the GNU Affero General Public License can be used, that proxy's
+public statement of acceptance of a version permanently authorizes you
+to choose that version for the Program.
+
+ Later license versions may give you additional or different
+permissions. However, no additional obligations are imposed on any
+author or copyright holder as a result of your choosing to follow a
+later version.
+
+ 15. Disclaimer of Warranty.
+
+ THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
+APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
+HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
+OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
+IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
+ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+ 16. Limitation of Liability.
+
+ IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
+THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
+GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
+USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
+DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
+PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
+EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGES.
+
+ 17. Interpretation of Sections 15 and 16.
+
+ If the disclaimer of warranty and limitation of liability provided
+above cannot be given local legal effect according to their terms,
+reviewing courts shall apply local law that most closely approximates
+an absolute waiver of all civil liability in connection with the
+Program, unless a warranty or assumption of liability accompanies a
+copy of the Program in return for a fee.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Programs
+
+ If you develop a new program, and you want it to be of the greatest
+possible use to the public, the best way to achieve this is to make it
+free software which everyone can redistribute and change under these terms.
+
+ To do so, attach the following notices to the program. It is safest
+to attach them to the start of each source file to most effectively
+state the exclusion of warranty; and each file should have at least
+the "copyright" line and a pointer to where the full notice is found.
+
+ <one line to give the program's name and a brief idea of what it does.>
+ Copyright (C) <year> <name of author>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+Also add information on how to contact you by electronic and paper mail.
+
+ If your software can interact with users remotely through a computer
+network, you should also make sure that it provides a way for users to
+get its source. For example, if your program is a web application, its
+interface could display a "Source" link that leads users to an archive
+of the code. There are many ways you could offer source, and different
+solutions will be better for different programs; see section 13 for the
+specific requirements.
+
+ You should also get your employer (if you work as a programmer) or school,
+if any, to sign a "copyright disclaimer" for the program, if necessary.
+For more information on this, and how to apply and follow the GNU AGPL, see
+<http://www.gnu.org/licenses/>.
diff --git a/Makefile.am b/Makefile.am
index 7ac77d7..d2617ab 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -7,6 +7,7 @@ SOURCES = \
nar-herder/database.scm \
nar-herder/server.scm \
nar-herder/recent-changes.scm \
+ nar-herder/cached-compression.scm \
nar-herder/storage.scm \
nar-herder/mirror.scm \
nar-herder/utils.scm
diff --git a/guile.am b/guile.am
index 7f07ca2..ca782a3 100644
--- a/guile.am
+++ b/guile.am
@@ -19,4 +19,4 @@ EXTRA_DIST = $(SOURCES) $(NOCOMP_SOURCES)
GUILE_WARNINGS = -Wunbound-variable -Warity-mismatch -Wformat
SUFFIXES = .scm .go
.scm.go:
- $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<"
+ $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile -W3 $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<"
diff --git a/guix-dev.scm b/guix-dev.scm
index bd6828f..61e282c 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -53,7 +53,7 @@
(inputs
`(("guix" ,guix)
("guile-json" ,guile-json-4)
- ("guile-fibers" ,guile-fibers-1.1)
+ ("guile-fibers" ,guile-fibers-1.3)
("guile-gcrypt" ,guile-gcrypt)
("guile-readline" ,guile-readline)
("guile-lzlib" ,guile-lzlib)
diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm
new file mode 100644
index 0000000..5c257dc
--- /dev/null
+++ b/nar-herder/cached-compression.scm
@@ -0,0 +1,753 @@
+;;; Nar Herder
+;;;
+;;; Copyright © 2022, 2023 Christopher Baines <mail@cbaines.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (nar-herder cached-compression)
+ #:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-19)
+ #:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 ftw)
+ #:use-module (ice-9 match)
+ #:use-module (ice-9 atomic)
+ #:use-module (ice-9 threads)
+ #:use-module (logging logger)
+ #:use-module (prometheus)
+ #:use-module (fibers)
+ #:use-module (fibers timers)
+ #:use-module (fibers channels)
+ #:use-module (fibers operations)
+ #:use-module (web uri)
+ #:use-module (web client)
+ #:use-module (web response)
+ #:use-module (guix store)
+ #:use-module ((guix utils) #:select (compressed-file?
+ call-with-decompressed-port))
+ #:use-module ((guix build utils)
+ #:select (dump-port mkdir-p))
+ #:use-module (nar-herder utils)
+ #:use-module (nar-herder database)
+ #:export (start-cached-compression-management-fiber
+ start-cached-compression-removal-fiber
+ start-cached-compression-schedule-removal-fiber))
+
+;; Nar caching overview
+;;
+;; On start
+;; - Compute the size of each cached compression directory
+;; - Remove database entries if they're missing from the directory
+;; - Remove files if they're missing from the database
+;;
+;; On nar usage
+;; - Bump count
+;; - If count is sufficient
+;; - Trigger generation of the cached nar
+;; - Skip if the work queue already includes this job
+;; - At the start of the job, check for a database entry and exit
+;; early if one exists bumping the atime of the file
+;; - If the file doesn't exist, check if there's free space
+;; - If there's free space, create the file
+;;
+;; Nar removal fiber
+;; - Periodically remove nars that have been scheduled for removal
+;; (when the scheduled time passes)
+;;
+;; Nar schedule removal fiber
+;; - Periodically check for nars that haven't been accessed in some
+;; time and schedule them for removal
+
+(define (perform-cached-compression-startup database
+ enabled-cached-compressions
+ nar-cache-files)
+ (let* ((database-entries-missing-files
+ ;; List tracking entries in the database for cached files,
+ ;; where the file is missing from the disk.
+ ;;
+ ;; These database entries will be removed at the end of the
+ ;; startup procecss.
+ '())
+
+ ;; alist of compression to hash table of files
+ ;;
+ ;; Entries from the hash tables will be removed when the
+ ;; database entry is processed below, so these hash tables
+ ;; will be left reflecting files in the directories, but with
+ ;; no entry in the database.
+ ;;
+ ;; These files will be deleted at the end of the startup
+ ;; process.
+ (files-by-compression
+ (map
+ (match-lambda
+ ((compression . details)
+ (let ((result (make-hash-table)))
+ (for-each
+ (lambda (file)
+ (hash-set! result file #t))
+ (scandir (assq-ref details 'directory)
+ (negate (cut member <> '("." "..")))))
+ (cons compression
+ result))))
+ enabled-cached-compressions))
+
+ (cached-bytes-by-compression
+ (database-fold-cached-narinfo-files
+ database
+ (lambda (details result)
+ (let ((compression
+ (assq-ref details 'compression))
+ (filename
+ (store-path-base
+ (assq-ref details 'store-path))))
+
+ (let ((files-hash
+ (assq-ref files-by-compression compression)))
+ (if (hash-ref files-hash filename)
+ (begin
+ (hash-remove! files-hash filename)
+
+ (metric-increment nar-cache-files
+ #:label-values
+ `((compression . ,compression)))
+
+ `((,compression . ,(+ (assq-ref details 'size)
+ (or (assq-ref result compression)
+ 0)))
+ ,@(alist-delete compression result)))
+
+ ;; Database entry, but file missing
+ (begin
+ (set! database-entries-missing-files
+ (cons details
+ database-entries-missing-files))
+ result)))))
+ (map
+ (lambda (compression)
+ (cons compression 0))
+ (map car enabled-cached-compressions)))))
+
+ ;; Delete cached files with no database entries
+ (for-each
+ (match-lambda
+ ((compression . hash-table)
+ (let ((count (hash-count (const #t)
+ hash-table)))
+ (unless (= 0 count)
+ (let ((directory
+ (assq-ref
+ (assq-ref enabled-cached-compressions compression)
+ 'directory)))
+ (simple-format #t "deleting ~A cached files from ~A\n"
+ count
+ directory)
+ (hash-for-each
+ (lambda (filename _)
+ (delete-file (string-append directory "/" filename)))
+ hash-table))))))
+ files-by-compression)
+
+ ;; Delete database entries where the file is missing
+ (let ((count (length database-entries-missing-files)))
+ (unless (= 0 count)
+ (simple-format
+ #t
+ "deleting ~A cached_narinfo_files entries due to missing files\n"
+ count)
+
+ (for-each
+ (lambda (details)
+ (database-remove-cached-narinfo-file database
+ (assq-ref details 'narinfo-id)
+ (symbol->string
+ (assq-ref details 'compression))))
+ database-entries-missing-files)))
+
+ cached-bytes-by-compression))
+
+;; This fiber manages metadata around cached compressions, and
+;; delegates tasks to the thread pool to generate newly compressed
+;; nars
+(define* (start-cached-compression-management-fiber
+ database
+ metrics-registry
+ nar-source
+ enabled-cached-compressions
+ cached-compression-min-uses
+ #:key (cached-compression-workers 2)
+ scheduler)
+
+ (define nar-cache-bytes-metric
+ (make-gauge-metric metrics-registry
+ "nar_cache_size_bytes"
+ #:labels '(compression)))
+
+ (define nar-cache-files
+ (make-gauge-metric metrics-registry
+ "nar_cache_files"
+ #:labels '(compression)))
+
+ (define channel
+ (make-channel))
+
+ (let ((process-job
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue cached-compression-workers
+ (lambda (thunk)
+ (thunk))
+ #:name "cached compression")))
+
+ (define (consider-narinfo cached-bytes-by-compression
+ usage-hash-table
+ narinfo-id)
+ (let* ((existing-cached-files
+ ;; This is important both to avoid trying to create
+ ;; files twice, but also in the case where additional
+ ;; compressions are enabled and more files need to be
+ ;; generated
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (existing-compressions
+ (map (lambda (details)
+ (assq-ref details 'compression))
+ existing-cached-files))
+ (missing-compressions
+ (lset-difference eq?
+ (map car enabled-cached-compressions)
+ existing-compressions))
+ (narinfo-files
+ (database-select-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+
+ (compress-file?
+ (let ((url (assq-ref (first narinfo-files) 'url)))
+ ;; TODO: Maybe this should be configurable?
+ (not (compressed-file? url)))))
+
+ (when (and compress-file?
+ (not (null? missing-compressions)))
+ (let ((new-count
+ (let ((val (+ 1
+ (or (hash-ref usage-hash-table narinfo-id)
+ 0))))
+ (hash-set! usage-hash-table
+ narinfo-id
+ val)
+ val)))
+
+ (when (and (> new-count
+ cached-compression-min-uses))
+ (let* ((narinfo-details
+ (database-select-narinfo database narinfo-id))
+ (nar-size
+ (assq-ref narinfo-details 'nar-size))
+ (compressions-with-space
+ (filter
+ (lambda (compression)
+ (let ((directory-max-size
+ (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'directory-max-size))
+ (current-directory-size
+ (assq-ref cached-bytes-by-compression
+ compression)))
+
+ (unless current-directory-size
+ (log-msg 'ERROR "current-directory-size unset: "
+ current-directory-size))
+ (unless nar-size
+ (log-msg 'ERROR "nar-size unset: "
+ nar-size))
+
+ (if directory-max-size
+ (< (+ current-directory-size
+ ;; Assume the commpressed nar could be
+ ;; as big as the uncompressed nar
+ nar-size)
+ directory-max-size)
+ #t)))
+ missing-compressions)))
+ (for-each
+ (lambda (compression)
+ (spawn-fiber
+ (lambda ()
+ (process-job
+ (lambda ()
+ (let ((new-bytes
+ (make-compressed-nar
+ narinfo-files
+ nar-source
+ enabled-cached-compressions
+ compression
+ #:level (assq-ref
+ (assq-ref enabled-cached-compressions
+ compression)
+ 'level))))
+ (put-message channel
+ (list 'cached-narinfo-added
+ narinfo-id
+ compression
+ new-bytes
+ #f))))))))
+ compressions-with-space)))))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((initial-cached-bytes-by-compression
+ (perform-cached-compression-startup
+ database
+ enabled-cached-compressions
+ nar-cache-files))
+ (nar-cached-compression-usage-hash-table
+ (make-hash-table 65536)))
+
+ (for-each
+ (match-lambda
+ ((compression . bytes)
+ (metric-set
+ nar-cache-bytes-metric
+ bytes
+ #:label-values `((compression . ,compression)))))
+ initial-cached-bytes-by-compression)
+
+ (let loop ((cached-bytes-by-compression
+ initial-cached-bytes-by-compression))
+ (match (get-message channel)
+ (('narinfo-id . narinfo-id)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception considering narinfo ("
+ narinfo-id "): " exn)
+ #f)
+ (lambda ()
+ (consider-narinfo cached-bytes-by-compression
+ nar-cached-compression-usage-hash-table
+ narinfo-id))
+ #:unwind? #t)
+ (loop cached-bytes-by-compression))
+
+ (((and (or 'cached-narinfo-added 'cached-narinfo-removed)
+ action)
+ narinfo-id compression size reply)
+ (let ((updated-bytes
+ ((if (eq? action 'cached-narinfo-added)
+ +
+ -)
+ (or (assq-ref cached-bytes-by-compression
+ compression)
+ 0)
+ size)))
+
+ (metric-set
+ nar-cache-bytes-metric
+ updated-bytes
+ #:label-values `((compression . ,compression)))
+
+ ((if (eq? action 'cached-narinfo-added)
+ metric-increment
+ metric-decrement)
+ nar-cache-files
+ #:label-values `((compression . ,compression)))
+
+ ;; Use an explicit transaction as it handles the
+ ;; database being busy,
+ (database-call-with-transaction
+ database
+ (lambda _
+ (if (eq? action 'cached-narinfo-added)
+ (database-insert-cached-narinfo-file
+ database
+ narinfo-id
+ size
+ compression)
+ (let ((cached-narinfo-details
+ (database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database
+ narinfo-id
+ compression)))
+
+ ;; It might not have been scheduled for
+ ;; removal, but remove any schedule that
+ ;; exists
+ (let ((schedule-removed?
+ (database-delete-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-details 'id))))
+ (when schedule-removed?
+ (metric-decrement
+ (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total"))))
+
+ ;; Remove all the database entries first, as
+ ;; that'll stop these files appearing in narinfos
+ (database-remove-cached-narinfo-file
+ database
+ narinfo-id
+ (symbol->string compression))))))
+
+ (hash-remove! nar-cached-compression-usage-hash-table
+ narinfo-id)
+
+ (when reply
+ (put-message reply #t))
+
+ (loop (alist-cons
+ compression
+ updated-bytes
+ (alist-delete compression
+ cached-bytes-by-compression)))))))))
+ scheduler)
+
+ channel))
+
+;; Periodically check for nars that haven't been accessed in some time
+;; and schedule them for removal
+(define (start-cached-compression-schedule-removal-fiber
+ database
+ metrics-registry
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel
+ base-ttl)
+
+ (define (files-to-schedule-for-removal compression-details)
+ (let* ((directory (assq-ref compression-details 'directory))
+ (unused-removal-duration
+ (assq-ref compression-details 'unused-removal-duration))
+ (atime-threshold
+ (time-second
+ (subtract-duration (current-time)
+ unused-removal-duration))))
+ (scandir
+ directory
+ (lambda (filename)
+ (and
+ (< (stat:atime (stat (string-append directory "/" filename)))
+ atime-threshold)
+ (not (member filename '("." ".."))))))))
+
+ (define (schedule-removal compression compression-details)
+ (let ((files
+ (let ((files
+ (with-time-logging "files-to-schedule-for-removal"
+ (files-to-schedule-for-removal compression-details))))
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber "
+ "looking at " (length files) " files")
+ files))
+ (count-metric
+ (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total")))
+
+ (with-time-logging "inserting scheduled-cached-narinfo-removals"
+ (for-each
+ (lambda (file)
+ (let* ((cached-narinfo-file-details
+ (database-select-cached-narinfo-file-by-hash
+ database
+ (string-take file 32) ; hash part
+ compression))
+ (existing-scheduled-removal
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id))))
+ (unless existing-scheduled-removal
+ (let ((removal-time
+ ;; The earliest this can be removed is the current
+ ;; time, plus the TTL
+ (add-duration
+ (current-time)
+ (make-time time-duration
+ 0
+ (or (assq-ref compression-details 'ttl)
+ base-ttl)))))
+ (database-insert-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-file-details 'id)
+ removal-time)
+
+ (metric-increment count-metric)))))
+ files))
+
+ ;; Wake the cached compression removal fiber in case one of
+ ;; the new scheduled removals is before it's scheduled to wake
+ ;; up
+ (put-message cached-compression-removal-fiber-wakeup-channel
+ #t)))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((sleep-duration
+ (max
+ (* 60 60 6) ; Maybe this be confirgurable
+ (apply min
+ (map (lambda (compression-details)
+ (/ (time-second
+ (assq-ref compression-details
+ 'unused-removal-duration))
+ 4))
+ enabled-cached-compressions))))
+ (start-count
+ (database-count-scheduled-cached-narinfo-removal
+ database)))
+
+ (metric-set
+ (or (metrics-registry-fetch-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total")
+ (make-gauge-metric
+ metrics-registry
+ "database_scheduled_cached_narinfo_removal_total"))
+ start-count)
+
+ (while #t
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber starting pass")
+
+ (for-each
+ (match-lambda
+ ((compression . details)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "cached-compression-schedule-removal-fiber: "
+ "exception: " exn))
+ (lambda ()
+ (schedule-removal compression details))
+ #:unwind? #t)))
+ enabled-cached-compressions)
+
+ (log-msg 'INFO "cached-compression-schedule-removal-fiber sleeping for "
+ sleep-duration)
+ (sleep sleep-duration))))))
+
+;; Process the scheduled removals for cached nars
+(define (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)
+
+ (define wakeup-channel
+ (make-channel))
+
+ (define (make-pass)
+ (log-msg 'INFO "cached-compression-removal-fiber starting pass")
+
+ (let ((scheduled-cached-narinfo-removal
+ (database-select-oldest-scheduled-cached-narinfo-removal
+ database)))
+
+ (log-msg 'INFO "scheduled removal: " scheduled-cached-narinfo-removal)
+
+ (if scheduled-cached-narinfo-removal
+ (if (time<=? (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))
+ (let ((id (assq-ref scheduled-cached-narinfo-removal 'id))
+ (narinfo-id (assq-ref scheduled-cached-narinfo-removal
+ 'narinfo-id))
+ (compression (assq-ref scheduled-cached-narinfo-removal
+ 'compression))
+ (size (assq-ref scheduled-cached-narinfo-removal
+ 'size))
+ (store-path (assq-ref scheduled-cached-narinfo-removal
+ 'store-path)))
+ (let ((reply (make-channel)))
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ narinfo-id
+ compression
+ size
+ reply))
+
+ ;; Wait for the management fiber to delete the
+ ;; database entry before removing the file.
+ (get-message reply))
+
+ (let ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (let ((filename
+ (string-append
+ directory "/"
+ (basename store-path))))
+ (log-msg 'INFO "deleting " filename)
+ (delete-file filename))))
+
+ (let ((duration
+ (time-difference
+ (assq-ref scheduled-cached-narinfo-removal
+ 'scheduled-removal-time)
+ (current-time))))
+ (perform-operation
+ (choice-operation
+ (sleep-operation (max 1 (+ 1 (time-second duration))))
+ (get-operation wakeup-channel)))))
+
+ ;; Sleep until woken
+ (get-message wakeup-channel))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in cached-compression-removal-fiber: "
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ make-pass
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))))
+
+ wakeup-channel)
+
+(define* (make-compressed-nar narinfo-files
+ nar-source
+ enabled-cached-compressions
+ target-compression
+ #:key level)
+ (define cached-compression-details
+ (assq-ref enabled-cached-compressions target-compression))
+
+ (log-msg 'INFO "making " target-compression " for "
+ (uri-decode
+ (basename
+ (assq-ref (first narinfo-files) 'url))))
+
+ (let* ((source-narinfo-file
+ ;; There's no specific logic to this, it should be possible
+ ;; to use any file
+ (first narinfo-files))
+ (source-filename
+ (cond
+ ((string-prefix? "http" nar-source)
+ (let* ((output-port (mkstemp "/tmp/nar-herder-source-nar-XXXXXX"))
+ (filename
+ (port-filename output-port))
+ (uri
+ (string->uri
+ (string-append nar-source
+ (assq-ref source-narinfo-file 'url)))))
+
+ (log-msg 'DEBUG "downloading " (uri->string uri))
+ (with-exception-handler
+ (lambda (exn)
+ (close-port output-port)
+ (delete-file filename)
+ (raise-exception exn))
+ (lambda ()
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:streaming? #t)))
+ (lambda (response body)
+ (unless (= (response-code response)
+ 200)
+ (error "unknown response code"
+ (response-code response)))
+
+ (dump-port body output-port)))
+ (close-port output-port))
+ #:timeout 30))
+ #:unwind? #t)
+
+ filename))
+ ((string-prefix? "/" nar-source)
+ (string-append
+ ;; If it's a filename, then it's the canonical path to
+ ;; the storage directory
+ nar-source
+ (uri-decode (assq-ref source-narinfo-file 'url))))
+ (else
+ (error "unknown nar source")))))
+
+ (let* ((dest-directory
+ (assq-ref cached-compression-details
+ 'directory))
+ (dest-filename
+ (string-append
+ dest-directory
+ "/"
+ (uri-decode
+ (basename
+ (assq-ref source-narinfo-file 'url)))))
+ (tmp-dest-filename
+ (string-append dest-filename ".tmp")))
+
+ (when (file-exists? tmp-dest-filename)
+ (delete-file tmp-dest-filename))
+ (when (file-exists? dest-filename)
+ (delete-file dest-filename))
+
+ (mkdir-p dest-directory)
+
+ (call-with-input-file
+ source-filename
+ (lambda (source-port)
+ (call-with-decompressed-port
+ (string->symbol
+ (assq-ref source-narinfo-file
+ 'compression))
+ source-port
+ (lambda (decompressed-source-port)
+ (let ((call-with-compressed-output-port*
+ (match target-compression
+ ('gzip
+ (@ (zlib) call-with-gzip-output-port))
+ ('lzip
+ (@ (lzlib) call-with-lzip-output-port))
+ ('zstd
+ (@ (zstd) call-with-zstd-output-port))
+ ('none
+ (lambda (port proc)
+ (proc port))))))
+ (apply
+ call-with-compressed-output-port*
+ (open-output-file tmp-dest-filename)
+ (lambda (compressed-port)
+ (dump-port decompressed-source-port
+ compressed-port))
+ (if level
+ `(#:level ,level)
+ '())))))))
+ (rename-file
+ tmp-dest-filename
+ dest-filename)
+
+ (when (string-prefix? "http" nar-source)
+ (log-msg 'DEBUG "deleting temporary file " source-filename)
+ (delete-file source-filename))
+
+ (let ((bytes
+ (stat:size (stat dest-filename))))
+ (log-msg 'INFO "created " dest-filename)
+
+ bytes))))
diff --git a/nar-herder/database.scm b/nar-herder/database.scm
index 61f5bb1..ded7c2c 100644
--- a/nar-herder/database.scm
+++ b/nar-herder/database.scm
@@ -28,10 +28,12 @@
#:use-module (sqlite3)
#:use-module (fibers)
#:use-module (prometheus)
+ #:use-module (guix store)
#:use-module (guix narinfo)
#:use-module (guix derivations)
#:use-module (nar-herder utils)
#:export (setup-database
+ update-database-metrics!
database-optimize
database-spawn-fibers
@@ -42,8 +44,11 @@
database-insert-narinfo
database-remove-narinfo
+ database-select-narinfo
+ database-select-narinfo-by-hash
database-select-narinfo-contents-by-hash
+ database-count-recent-changes
database-select-recent-changes
database-select-latest-recent-change-datetime
database-get-recent-changes-id-for-deletion
@@ -51,8 +56,25 @@
database-select-narinfo-for-file
database-select-narinfo-files
-
- database-map-all-narinfo-files))
+ database-select-narinfo-files-by-narinfo-id
+
+ database-fold-all-narinfo-files
+ database-map-all-narinfo-files
+ database-count-narinfo-files
+
+ database-insert-cached-narinfo-file
+ database-select-cached-narinfo-file-by-hash
+ database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database-select-cached-narinfo-files-by-narinfo-id
+ database-fold-cached-narinfo-files
+ database-remove-cached-narinfo-file
+
+ database-select-scheduled-narinfo-removal
+ database-select-scheduled-cached-narinfo-removal
+ database-delete-scheduled-cached-narinfo-removal
+ database-select-oldest-scheduled-cached-narinfo-removal
+ database-count-scheduled-cached-narinfo-removal
+ database-insert-scheduled-cached-narinfo-removal))
(define-record-type <database>
(make-database database-file reader-thread-channel writer-thread-channel
@@ -85,7 +107,8 @@ CREATE TABLE narinfos (
nar_size INTEGER NOT NULL,
deriver TEXT,
system TEXT,
- contents NOT NULL
+ contents NOT NULL,
+ added_at TEXT
);
CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32));
@@ -97,6 +120,8 @@ CREATE TABLE narinfo_files (
url TEXT NOT NULL
);
+CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id);
+
CREATE TABLE narinfo_references (
narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
store_path TEXT NOT NULL
@@ -120,28 +145,124 @@ CREATE TABLE recent_changes (
datetime TEXT NOT NULL,
change TEXT NOT NULl,
data TEXT NOT NULL
+);
+
+CREATE TABLE cached_narinfo_files (
+ id INTEGER PRIMARY KEY ASC,
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ size INTEGER NOT NULL,
+ compression TEXT
+);
+
+CREATE INDEX cached_narinfo_files_narinfo_id
+ ON cached_narinfo_files (narinfo_id);
+
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);
+
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
);")
(sqlite-exec db schema))
-(define (update-schema db)
+(define (table-exists? db name)
(let ((statement
(sqlite-prepare
db
"
-SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
+SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = :name")))
(sqlite-bind-arguments
statement
- #:name "narinfos")
+ #:name name)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#f #f)
+ (#(1) #t))))
+ (sqlite-finalize statement)
- (match (sqlite-step statement)
- (#f (perform-initial-database-setup db))
- (_ #f))
+ result)))
+
+(define (column-exists? db table-name column-name)
+ (let ((statement
+ (sqlite-prepare
+ db
+ (simple-format #f "PRAGMA table_info(~A);" table-name))))
+
+ (let ((columns
+ (sqlite-map
+ (lambda (row)
+ (vector-ref row 1))
+ statement)))
+ (sqlite-finalize statement)
+
+ (member column-name columns))))
+
+(define (update-schema db)
+ (unless (table-exists? db "narinfos")
+ (perform-initial-database-setup db))
+
+ (unless (table-exists? db "cached_narinfo_files")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE cached_narinfo_files (
+ id INTEGER PRIMARY KEY ASC,
+ narinfo_id INTEGER NOT NULL REFERENCES narinfos (id),
+ size INTEGER NOT NULL,
+ compression TEXT NOT NULL
+);
- (sqlite-finalize statement)))
+CREATE INDEX cached_narinfo_files_narinfo_id
+ ON cached_narinfo_files (narinfo_id);"))
+
+ (unless (column-exists? db "narinfos" "added_at")
+ (sqlite-exec
+ db
+ "ALTER TABLE narinfos ADD COLUMN added_at TEXT;"))
+
+ (unless (table-exists? db "scheduled_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_narinfo_removal (
+ narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
+ (unless (table-exists? db "scheduled_cached_narinfo_removal")
+ (sqlite-exec
+ db
+ "
+CREATE TABLE scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id),
+ removal_datetime TEXT NOT NULL
+);"))
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id
+ ON narinfo_tags (narinfo_id);")
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS narinfo_references_narinfo_id
+ ON narinfo_references (narinfo_id);")
+
+ (sqlite-exec
+ db
+ "CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id
+ ON narinfo_files (narinfo_id);"))
+
+(define* (setup-database database-file metrics-registry
+ #:key (reader-threads 1))
+ (define mmap-size #f)
-(define (setup-database database-file metrics-registry)
(let ((db (db-open database-file)))
(sqlite-exec db "PRAGMA journal_mode=WAL;")
(sqlite-exec db "PRAGMA optimize;")
@@ -149,44 +270,77 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
(update-schema db)
+ ;; (let ((requested-mmap-bytes 2147418112)
+ ;; (statement
+ ;; (sqlite-prepare
+ ;; db
+ ;; (simple-format #f "PRAGMA mmap_size=~A;"
+ ;; 2147418112))))
+ ;; (match (sqlite-step statement)
+ ;; (#(result-mmap-size)
+ ;; (sqlite-finalize statement)
+ ;; (set! mmap-size
+ ;; result-mmap-size))))
+
(sqlite-close db))
(let ((reader-thread-channel
- (make-worker-thread-channel
+ (make-worker-thread-set
(lambda ()
(let ((db
(db-open database-file #:write? #f)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
+ (when mmap-size
+ (sqlite-exec
+ db
+ (simple-format #f "PRAGMA mmap_size=~A;"
+ (number->string mmap-size))))
(list db)))
#:destructor
(lambda (db)
(sqlite-close db))
#:lifetime 50000
+ #:name "db r"
;; Use a minimum of 2 and a maximum of 8 threads
- #:parallelism
- (min (max (current-processor-count)
- 2)
- 64)
+ #:parallelism reader-threads
#:delay-logger (let ((delay-metric
(make-histogram-metric
metrics-registry
"database_read_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric seconds-delayed)
(when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database read delayed by ~1,2f seconds~%"
- seconds-delayed))))))
+ (display
+ (format
+ #f
+ "warning: database read (~a) delayed by ~1,2f seconds~%"
+ proc
+ seconds-delayed)
+ (current-error-port)))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 5)
+ (display
+ (format
+ #f
+ "warning: database read took ~1,2f seconds (~a)~%"
+ duration
+ proc)
+ (current-error-port))))))
(writer-thread-channel
- (make-worker-thread-channel
+ (make-worker-thread-set
(lambda ()
(let ((db
(db-open database-file)))
(sqlite-exec db "PRAGMA busy_timeout = 5000;")
(sqlite-exec db "PRAGMA foreign_keys = ON;")
+ (when mmap-size
+ (sqlite-exec
+ db
+ (simple-format #f "PRAGMA mmap_size=~A;"
+ (number->string mmap-size))))
(list db)))
#:destructor
(lambda (db)
@@ -195,6 +349,7 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
(sqlite-close db))
#:lifetime 500
+ #:name "db w"
;; SQLite doesn't support parallel writes
#:parallelism 1
@@ -202,19 +357,56 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name")))
(make-histogram-metric
metrics-registry
"database_write_delay_seconds")))
- (lambda (seconds-delayed)
+ (lambda (seconds-delayed proc)
(metric-observe delay-metric seconds-delayed)
(when (> seconds-delayed 1)
- (format
- (current-error-port)
- "warning: database write delayed by ~1,2f seconds~%"
- seconds-delayed)))))))
+ (display
+ (format
+ #f
+ "warning: database write (~a) delayed by ~1,2f seconds~%"
+ proc
+ seconds-delayed)
+ (current-error-port)))))
+ #:duration-logger
+ (lambda (duration proc)
+ (when (> duration 5)
+ (display
+ (format
+ #f
+ "warning: database write took ~1,2f seconds (~a)~%"
+ duration
+ proc)
+ (current-error-port)))))))
(make-database database-file
reader-thread-channel
writer-thread-channel
metrics-registry)))
+(define (update-database-metrics! database)
+ (let* ((db-filename (database-file database))
+ (db-wal-filename
+ (string-append db-filename "-wal"))
+
+ (registry (database-metrics-registry database))
+ (db-bytes
+ (or (metrics-registry-fetch-metric registry
+ "database_bytes")
+ (make-gauge-metric
+ registry "database_bytes"
+ #:docstring "Size of the SQLite database file")))
+ (db-wal-bytes
+ (or (metrics-registry-fetch-metric registry
+ "database_wal_bytes")
+ (make-gauge-metric
+ registry "database_wal_bytes"
+ #:docstring "Size of the SQLite Write Ahead Log file"))))
+
+
+ (metric-set db-bytes (stat:size (stat db-filename)))
+ (metric-set db-wal-bytes (stat:size (stat db-wal-filename))))
+ #t)
+
(define (db-optimize db db-filename)
(define (wal-size)
(let ((db-wal-filename
@@ -267,43 +459,110 @@ PRAGMA optimize;")))
(string-append "database_" thing "_duration_seconds"))
(if registry
- (let* ((metric
- (or (metrics-registry-fetch-metric registry metric-name)
- (make-histogram-metric registry
- metric-name)))
- (start-time (get-internal-real-time)))
- (let ((result (thunk)))
- (metric-observe metric
- (/ (- (get-internal-real-time) start-time)
- internal-time-units-per-second))
- result))
+ (call-with-duration-metric registry
+ metric-name
+ thunk)
(thunk)))
(define %current-transaction-proc
(make-parameter #f))
(define* (database-call-with-transaction database proc
- #:key
- readonly?)
+ #:key
+ readonly?
+ (immediate? (not readonly?)))
(define (run-proc-within-transaction db)
- (if (%current-transaction-proc)
- (proc db) ; already in transaction
- (begin
- (sqlite-exec db "BEGIN TRANSACTION;")
- (with-exception-handler
- (lambda (exn)
- (simple-format (current-error-port)
- "error: sqlite rolling back transaction\n")
- (sqlite-exec db "ROLLBACK TRANSACTION;")
- (raise-exception exn))
+ (define (attempt-begin)
+ (with-exception-handler
+ (lambda (exn)
+ (match (exception-args exn)
+ (('sqlite-exec 5 msg)
+ (simple-format
+ (current-error-port)
+ "warning: issue starting transaction (code: 5, proc: ~A): ~A\n"
+ proc msg)
+ #f)
+ (_
+ (simple-format (current-error-port)
+ "exception starting transaction: ~A\n" exn)
+ (raise-exception exn))))
+ (lambda ()
+ (sqlite-exec db (if immediate?
+ "BEGIN IMMEDIATE TRANSACTION;"
+ "BEGIN TRANSACTION;"))
+ #t)
+ #:unwind? #t))
+
+ (define (attempt-commit)
+ (with-exception-handler
+ (lambda (exn)
+ (match (exception-args exn)
+ (('sqlite-exec 5 msg)
+ (simple-format
+ (current-error-port)
+ "warning: attempt commit (code: 5, proc: ~A): ~A\n"
+ proc msg)
+ #f)
+ (_
+ (simple-format (current-error-port)
+ "exception committing transaction: ~A\n" exn)
+ (raise-exception exn))))
+ (lambda ()
+ (sqlite-exec db "COMMIT TRANSACTION;")
+ #t)
+ #:unwind? #t))
+
+ (if (attempt-begin)
+ (call-with-values
(lambda ()
- (call-with-values
- (lambda ()
- (parameterize ((%current-transaction-proc proc))
- (proc db)))
- (lambda vals
- (sqlite-exec db "COMMIT TRANSACTION;")
- (apply values vals))))))))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format (current-error-port)
+ "error: sqlite rolling back transaction (~A)\n"
+ exn)
+ (sqlite-exec db "ROLLBACK TRANSACTION;")
+ (raise-exception exn))
+ (lambda ()
+ (parameterize ((%current-transaction-proc proc))
+ (proc-with-duration-timing db)))
+ #:unwind? #t))
+ (lambda vals
+ (let loop ((success? (attempt-commit)))
+ (if success?
+ (apply values vals)
+ (loop (attempt-commit))))))
+
+ ;; Database is busy, so retry
+ (run-proc-within-transaction db)))
+
+ (define (proc-with-duration-timing db)
+ (let ((start-time (get-internal-real-time)))
+ (call-with-values
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (proc db))
+ (lambda (key . args)
+ (simple-format
+ (current-error-port)
+ "exception in transaction: ~A: ~A\n"
+ key args)
+ (backtrace))))
+ (lambda vals
+ (let ((duration-seconds
+ (/ (- (get-internal-real-time) start-time)
+ internal-time-units-per-second)))
+ (when (and (not readonly?)
+ (> duration-seconds 2))
+ (display
+ (format
+ #f
+ "warning: ~a:\n took ~4f seconds in transaction\n"
+ proc
+ duration-seconds)
+ (current-error-port)))
+
+ (cons duration-seconds vals))))))
(match (call-with-worker-thread
((if readonly?
@@ -311,25 +570,9 @@ PRAGMA optimize;")))
database-writer-thread-channel)
database)
(lambda (db)
- (let ((start-time (get-internal-real-time)))
- (call-with-values
- (lambda ()
- (run-proc-within-transaction db))
- (lambda vals
- (let ((duration-seconds
- (/ (- (get-internal-real-time) start-time)
- internal-time-units-per-second)))
- (when (and (not readonly?)
- (> duration-seconds 2))
- (display
- (format
- #f
- "warning: ~a:\n took ~4f seconds in transaction\n"
- proc
- duration-seconds)
- (current-error-port)))
-
- (cons duration-seconds vals)))))))
+ (if (%current-transaction-proc)
+ (proc-with-duration-timing db) ; already in transaction
+ (run-proc-within-transaction db))))
((duration vals ...)
(apply values vals))))
@@ -417,9 +660,9 @@ SELECT id FROM tags WHERE key = :key AND value = :value"
db
"
INSERT INTO narinfos (
- store_path, nar_hash, nar_size, deriver, system, contents
+ store_path, nar_hash, nar_size, deriver, system, contents, added_at
) VALUES (
- :store_path, :nar_hash, :nar_size, :deriver, :system, :contents
+ :store_path, :nar_hash, :nar_size, :deriver, :system, :contents, :added_at
)"
#:cache? #t)))
(sqlite-bind-arguments
@@ -429,7 +672,8 @@ INSERT INTO narinfos (
#:nar_size (narinfo-size narinfo)
#:deriver (narinfo-deriver narinfo)
#:system (narinfo-system narinfo)
- #:contents (narinfo-contents narinfo))
+ #:contents (narinfo-contents narinfo)
+ #:added_at (date->string (current-date) "~1 ~3"))
(sqlite-step statement)
(sqlite-reset statement)
@@ -563,23 +807,6 @@ INSERT INTO narinfo_tags (narinfo_id, tag_id) VALUES (:narinfo_id, :tag_id)"
(define* (database-remove-narinfo database store-path
#:key change-datetime)
- (define (store-path->narinfo-id db)
- (let ((statement
- (sqlite-prepare
- db
- "
-SELECT id FROM narinfos WHERE store_path = :store_path"
- #:cache? #t)))
-
- (sqlite-bind-arguments
- statement
- #:store_path store-path)
-
- (let ((result (vector-ref (sqlite-step statement) 0)))
- (sqlite-reset statement)
-
- result)))
-
(define (remove-narinfo-record db id)
(let ((statement
(sqlite-prepare
@@ -677,18 +904,90 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id"
(database-call-with-transaction
database
(lambda (db)
- (let ((narinfo-id (store-path->narinfo-id db)))
- (if change-datetime
- (insert-change-with-datetime db store-path
- change-datetime)
- (insert-change db store-path))
+ (let ((narinfo-details
+ (database-select-narinfo-by-hash
+ database
+ (store-path-hash-part store-path))))
+ (if narinfo-details
+ (let ((narinfo-id (assq-ref narinfo-details
+ 'id)))
+ (if change-datetime
+ (insert-change-with-datetime db store-path
+ change-datetime)
+ (insert-change db store-path))
+
+ (remove-narinfo-files db narinfo-id)
+ (remove-narinfo-references db narinfo-id)
+ (remove-tags db narinfo-id)
+
+ (remove-narinfo-record db narinfo-id)
+ #t)
+ #f)))))
+
+(define (database-select-narinfo database id)
+ (call-with-time-tracking
+ database
+ "select_narinfo"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT store_path, nar_hash, nar_size, deriver, system
+FROM narinfos
+WHERE id = :id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:id id)
- (remove-narinfo-files db narinfo-id)
- (remove-narinfo-references db narinfo-id)
- (remove-tags db narinfo-id)
- (remove-narinfo-record db narinfo-id)
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(store_path nar_hash nar_size deriver system)
+ `((store-path . ,store_path)
+ (nar-hash . ,nar_hash)
+ (nar-size . ,nar_size)
+ (deriver . ,deriver)
+ (system . ,system)))
+ (_
+ #f))))))))
+
+(define (database-select-narinfo-by-hash database hash)
+ (call-with-time-tracking
+ database
+ "select_narinfo_by_hash"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT id, store_path, nar_hash, nar_size, deriver, system
+FROM narinfos
+WHERE substr(store_path, 12, 32) = :hash"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:hash hash)
- #t))))
+ (match (let ((result (sqlite-step statement)))
+ (sqlite-reset statement)
+ result)
+ (#(id store_path nar_hash nar_size deriver system)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (nar-hash . ,nar_hash)
+ (nar-size . ,nar_size)
+ (deriver . ,deriver)
+ (system . ,system)))
+ (_
+ #f))))))))
(define (database-select-narinfo-contents-by-hash database hash)
(call-with-time-tracking
@@ -702,7 +1001,7 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id"
(sqlite-prepare
db
"
-SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
+SELECT id, contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
#:cache? #t)))
(sqlite-bind-arguments
statement
@@ -711,8 +1010,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
(match (let ((result (sqlite-step statement)))
(sqlite-reset statement)
result)
- (#(contents) contents)
- (_ #f))))))))
+ (#(id contents)
+ (values contents id))
+ (_
+ (values #f #f)))))))))
+
+(define (database-count-recent-changes database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT COUNT(*) FROM recent_changes"
+ #:cache? #t)))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(count) count))))
+ (sqlite-reset statement)
+ result)))))
(define* (database-select-recent-changes database after-date #:key (limit 8192))
(call-with-worker-thread
@@ -722,26 +1040,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash"
(sqlite-prepare
db
"
-SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime LIMIT :limit"
+SELECT datetime, change, data
+FROM recent_changes
+WHERE datetime >= :datetime
+ORDER BY datetime ASC
+LIMIT :limit"
#:cache? #t)))
(sqlite-bind-arguments
statement
#:datetime after-date
#:limit limit)
- (let loop ((row (sqlite-step statement))
- (result '()))
- (match row
- (#(datetime change data)
- (loop (sqlite-step statement)
- (cons `((datetime . ,datetime)
- (change . ,change)
- (data . ,data))
- result)))
- (#f
- (sqlite-reset statement)
-
- (reverse result))))))))
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(datetime change data)
+ `((datetime . ,datetime)
+ (change . ,change)
+ (data . ,data))))
+ statement)))
+ (sqlite-reset statement)
+ result)))))
(define (database-select-latest-recent-change-datetime database)
(call-with-worker-thread
@@ -869,7 +1188,42 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash"
result)))))))
-(define (database-map-all-narinfo-files database proc)
+(define (database-select-narinfo-files-by-narinfo-id database narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_narinfo_files_by_narinfo_id"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url
+FROM narinfos
+INNER JOIN narinfo_files
+ ON narinfos.id = narinfo_files.narinfo_id
+WHERE narinfos.id = :narinfo_id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(size compression url)
+ `((size . ,size)
+ (compression . ,compression)
+ (url . ,url))))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-fold-all-narinfo-files database proc init)
(call-with-worker-thread
(database-reader-thread-channel database)
(lambda (db)
@@ -885,12 +1239,410 @@ FROM narinfo_files"
(lambda (row result)
(match row
(#(size compression url)
- (cons (proc `((size . ,size)
- (compression . ,compression)
- (url . ,url)))
+ (proc `((size . ,size)
+ (compression . ,compression)
+ (url . ,url))
+ result))))
+ init
+ statement)))
+ (sqlite-reset statement)
+
+ result-list)))))
+
+(define (database-map-all-narinfo-files database proc)
+ (database-fold-all-narinfo-files
+ database
+ (lambda (nar-file result)
+ (cons (proc nar-file)
+ result))
+ '()))
+
+(define (database-count-narinfo-files database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT COUNT(*) FROM narinfo_files"
+ #:cache? #t)))
+
+ (let ((result
+ (vector-ref (sqlite-step statement)
+ 0)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-insert-cached-narinfo-file database
+ narinfo-id
+ size
+ compression)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO cached_narinfo_files (
+ narinfo_id, size, compression
+) VALUES (
+ :narinfo_id, :size, :compression
+)"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:size size
+ #:compression (symbol->string compression))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ (last-insert-rowid db)))))
+
+(define (database-select-cached-narinfo-file-by-hash database
+ hash
+ compression)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_hash"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id, cached_narinfo_files.size
+FROM narinfos
+INNER JOIN cached_narinfo_files
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE substr(narinfos.store_path, 12, 32) = :hash
+ AND cached_narinfo_files.compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:hash hash
+ #:compression (symbol->string compression))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id size)
+ `((id . ,id)
+ (size . ,size)))
+ (#f #f))))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_narinfo_id"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE narinfo_id = :narinfo_id"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (sqlite-map
+ (match-lambda
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (size . ,size)
+ (compression . ,(string->symbol compression)))))
+ statement)))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-select-cached-narinfo-file-by-narinfo-id-and-compression
+ database
+ narinfo-id
+ compression)
+ (call-with-time-tracking
+ database
+ "select_cached_narinfo_file_by_narinfo_id_and_compression"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ narinfos.store_path,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+WHERE narinfo_id = :narinfo_id
+ AND compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:compression (symbol->string compression))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id store_path size compression)
+ `((id . ,id)
+ (store-path . ,store_path)
+ (size . ,size)
+ (compression . ,(string->symbol compression)))))))
+ (sqlite-reset statement)
+
+ result)))))))
+
+(define (database-fold-cached-narinfo-files database
+ proc
+ init)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT store_path, size, compression, narinfo_id
+FROM cached_narinfo_files
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id"
+ #:cache? #t)))
+ (let ((result-list
+ (sqlite-fold
+ (lambda (row result)
+ (match row
+ (#(store_path size compression narinfo_id)
+ (proc `((size . ,size)
+ (compression . ,(string->symbol compression))
+ (store-path . ,store_path)
+ (narinfo-id . ,narinfo_id))
result))))
- '()
+ init
statement)))
(sqlite-reset statement)
result-list)))))
+
+(define (database-remove-cached-narinfo-file database narinfo-id compression)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM cached_narinfo_files
+WHERE narinfo_id = :narinfo_id
+ AND compression = :compression"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id
+ #:compression compression)
+
+ (sqlite-step statement)
+ (sqlite-reset statement)))))
+
+(define (database-select-scheduled-narinfo-removal database narinfo-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_narinfo_removal
+WHERE narinfo_id = :narinfo_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:narinfo_id narinfo-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-select-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-time-tracking
+ database
+ "select_scheduled_narinfo_removal"
+ (lambda ()
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT removal_datetime
+FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(datetime)
+ (date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))))
+
+(define (database-delete-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+DELETE FROM scheduled_cached_narinfo_removal
+WHERE cached_narinfo_file_id = :cached_narinfo_file_id
+RETURNING 1"
+ #:cache? #t)))
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id)
+
+ (let ((result (->bool (sqlite-step statement))))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-select-oldest-scheduled-cached-narinfo-removal database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT cached_narinfo_files.id,
+ cached_narinfo_files.narinfo_id,
+ cached_narinfo_files.size,
+ cached_narinfo_files.compression,
+ narinfos.store_path,
+ scheduled_cached_narinfo_removal.removal_datetime
+FROM scheduled_cached_narinfo_removal
+INNER JOIN cached_narinfo_files
+ ON scheduled_cached_narinfo_removal.cached_narinfo_file_id =
+ cached_narinfo_files.id
+INNER JOIN narinfos
+ ON cached_narinfo_files.narinfo_id = narinfos.id
+ORDER BY scheduled_cached_narinfo_removal.removal_datetime DESC
+LIMIT 1"
+ #:cache? #t)))
+
+ (let ((result
+ (match (sqlite-step statement)
+ (#(id narinfo_id size compression store_path datetime)
+ `((id . ,id)
+ (narinfo-id . ,narinfo_id)
+ (size . ,size)
+ (compression . ,(string->symbol compression))
+ (store-path . ,store_path)
+ (scheduled-removal-time . ,(date->time-utc
+ (string->date
+ datetime
+ "~Y-~m-~d ~H:~M:~S")))))
+ (#f #f))))
+ (sqlite-reset statement)
+ result)))))
+
+(define (database-count-scheduled-cached-narinfo-removal database)
+ (call-with-worker-thread
+ (database-reader-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+SELECT COUNT(*) FROM scheduled_cached_narinfo_removal"
+ #:cache? #t)))
+
+ (let ((result
+ (vector-ref (sqlite-step statement)
+ 0)))
+ (sqlite-reset statement)
+
+ result)))))
+
+(define (database-insert-scheduled-cached-narinfo-removal database
+ cached-narinfo-file-id
+ removal-datetime)
+ (call-with-worker-thread
+ (database-writer-thread-channel database)
+ (lambda (db)
+ (let ((statement
+ (sqlite-prepare
+ db
+ "
+INSERT INTO scheduled_cached_narinfo_removal (
+ cached_narinfo_file_id, removal_datetime
+) VALUES (
+ :cached_narinfo_file_id, :removal_datetime
+)"
+ #:cache? #t)))
+
+ (sqlite-bind-arguments
+ statement
+ #:cached_narinfo_file_id cached-narinfo-file-id
+ #:removal_datetime (date->string
+ (time-utc->date removal-datetime)
+ "~Y-~m-~d ~H:~M:~S"))
+
+ (sqlite-step statement)
+ (sqlite-reset statement)
+
+ #t))))
diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm
index 6b9f4f7..a784165 100644
--- a/nar-herder/mirror.scm
+++ b/nar-herder/mirror.scm
@@ -19,6 +19,7 @@
(define-module (nar-herder mirror)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-43)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (ice-9 exceptions)
@@ -29,22 +30,18 @@
#:use-module (prometheus)
#:use-module (logging logger)
#:use-module (json)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (guix narinfo)
#:use-module ((guix store) #:select (store-path-hash-part))
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
#:use-module (nar-herder storage)
- #:export (start-fetch-changes-thread))
-
-(define (start-fetch-changes-thread database storage-root
- mirror metrics-registry)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
+ #:export (start-fetch-changes-fiber))
+(define (start-fetch-changes-fiber database metrics-registry
+ storage-root mirror
+ cached-compression-management-channel)
(define (request-recent-changes)
(define latest-recent-change
(database-select-latest-recent-change-datetime database))
@@ -72,18 +69,25 @@
(lambda ()
(retry-on-error
(lambda ()
- (http-get uri #:decode-body? #f))
+ (log-msg 'INFO "querying for recent changes since "
+ latest-recent-change)
+ (with-port-timeouts
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:streaming? #t)))
+ #:timeout 30))
#:times 3
#:delay 15))
(lambda (response body)
(if (= (response-code response) 200)
- (let* ((json-body (json-string->scm
- (utf8->string body)))
+ (let* ((json-body (json->scm body))
(recent-changes
(assoc-ref json-body "recent_changes")))
- (log-msg 'INFO "queried for recent changes since "
- latest-recent-change)
(log-msg 'INFO "got " (vector-length recent-changes) " changes")
;; Switch to symbol keys and standardise the key order
@@ -118,46 +122,75 @@
(assq-ref change-details
'datetime))
- (let ((new-files-count (length (narinfo-uris narinfo))))
- (metric-increment nar-files-metric
- #:by new-files-count
- ;; TODO This should be
- ;; checked, rather than
- ;; assumed to be false
- #:label-values '((stored . "false"))))))
+ (and=> (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (lambda (metric)
+ ;; Just update this metric if it
+ ;; exists, since if it does, it
+ ;; should be set to a value
+ (let ((new-files-count
+ (length (narinfo-uris narinfo))))
+ (metric-increment
+ metric
+ #:by new-files-count
+ ;; TODO This should be
+ ;; checked, rather than
+ ;; assumed to be false
+ #:label-values '((stored . "false"))))))))
((string=? change "removal")
(let ((store-path (assq-ref change-details 'data)))
(log-msg 'INFO "processing removal change for "
store-path
" (" (assq-ref change-details 'datetime) ")")
+ (let* ((hash (store-path-hash-part store-path))
+ (narinfo-details
+ (database-select-narinfo-by-hash
+ database
+ hash)))
+
(when storage-root
(remove-nar-files-by-hash
database
storage-root
metrics-registry
- (store-path-hash-part store-path)))
+ hash))
+
+ (let ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ (assq-ref narinfo-details 'id))))
+ (for-each
+ (lambda (cached-narinfo-file-details)
+ ;; TODO Delete the file as well
+
+ (let ((reply (make-channel)))
+ (put-message
+ cached-compression-management-channel
+ (list 'cached-narinfo-removed
+ (assq-ref narinfo-details 'id)
+ (assq-ref cached-narinfo-files 'compression)
+ (assq-ref cached-narinfo-files 'size)
+ reply))
+ (get-message reply)))
+ cached-narinfo-files))
(database-remove-narinfo database
store-path
#:change-datetime
(assq-ref change-details
- 'datetime))))
+ 'datetime)))))
(else
(error "unimplemented"))))))
recent-changes))
(raise-exception
(make-exception-with-message
- (simple-format #f "unknown response: ~A\n code: ~A response: ~A"
+ (simple-format #f "unknown response: ~A code: ~A"
(uri->string uri)
- (response-code response)
- (utf8->string body))))))))
+ (response-code response))))))))
- (call-with-new-thread
+ (spawn-fiber
(lambda ()
- ;; This will initialise the nar_files_total metric
- (get-nar-files database storage-root metrics-registry)
-
(while #t
(with-exception-handler
(lambda (exn)
@@ -165,4 +198,5 @@
request-recent-changes
#:unwind? #t)
+ (log-msg 'DEBUG "finished requesting recent changes, sleeping")
(sleep 60)))))
diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm
index fee63f3..ccfff93 100644
--- a/nar-herder/recent-changes.scm
+++ b/nar-herder/recent-changes.scm
@@ -18,15 +18,25 @@
(define-module (nar-herder recent-changes)
#:use-module (srfi srfi-1)
+ #:use-module (ice-9 match)
#:use-module (ice-9 threads)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (logging logger)
+ #:use-module (prometheus)
+ #:use-module (web uri)
+ #:use-module (guix narinfo)
#:use-module (nar-herder database)
- #:export (start-recent-change-removal-and-database-dump-thread))
+ #:export (start-recent-change-removal-and-database-dump-fiber
+ start-recent-change-listener-fiber))
-(define (start-recent-change-removal-and-database-dump-thread database
- database-dump-filename
- check-interval
- recent-changes-limit)
+(define (start-recent-change-removal-and-database-dump-fiber database
+ metrics-registry
+ database-dump-filename
+ check-interval
+ recent-changes-limit)
(define (update-database-dump)
+ (log-msg 'DEBUG "updating the database dump at " database-dump-filename)
(let ((temp-database-dump-filename
(string-append database-dump-filename ".tmp")))
@@ -41,23 +51,127 @@
(simple-format (current-error-port)
"updated database dump\n")))
- (call-with-new-thread
+ (define recent-changes-count-metric
+ (metrics-registry-fetch-metric metrics-registry "recent_changes_count"))
+
+ (spawn-fiber
(lambda ()
(while #t
- (let ((recent-changes-id-for-deletion
- (database-get-recent-changes-id-for-deletion database
- recent-changes-limit)))
- (when recent-changes-id-for-deletion
- (update-database-dump)
-
- (let ((deleted-recent-changes
- (database-delete-recent-changes-with-id-below
- database
- recent-changes-id-for-deletion)))
- (simple-format (current-error-port)
- "deleted ~A recent changes\n"
- deleted-recent-changes)))
-
- (sleep check-interval))))))
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception in recent change removal thread: ~A\n"
+ exn)
+ (sleep 120))
+ (lambda ()
+ (let ((recent-changes-id-for-deletion
+ (database-get-recent-changes-id-for-deletion database
+ recent-changes-limit)))
+ (when recent-changes-id-for-deletion
+ (when database-dump-filename
+ (update-database-dump))
+
+ (let ((deleted-recent-changes
+ (database-call-with-transaction
+ database
+ (lambda _
+ (database-delete-recent-changes-with-id-below
+ database
+ recent-changes-id-for-deletion)))))
+
+ (metric-decrement recent-changes-count-metric
+ #:by deleted-recent-changes)
+
+ (simple-format (current-error-port)
+ "deleted ~A recent changes\n"
+ deleted-recent-changes)))
+
+ (sleep check-interval)))
+ #:unwind? #t)))))
+
+(define (start-recent-change-listener-fiber database
+ metrics-registry
+ addition-channel
+ removal-channel)
+ (define recent-changes-count-metric
+ (metrics-registry-fetch-metric metrics-registry "recent_changes_count"))
+
+ (define (process-addition-change change-details)
+ (let ((narinfo
+ (call-with-input-string
+ (assq-ref change-details 'data)
+ (lambda (port)
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+ (for-each
+ (lambda (uri)
+ (log-msg 'DEBUG "processing recent addition of " (uri-path uri))
+ (put-message addition-channel (list 'addition (uri-path uri))))
+ (narinfo-uris narinfo))))
+
+ (define (process-removal-change change-details)
+ (log-msg 'DEBUG "processing recent change triggered removal of "
+ (assq-ref change-details 'data))
+ (put-message removal-channel
+ (list 'remove (assq-ref change-details 'data))))
+
+ (spawn-fiber
+ (lambda ()
+ (let ((recent-changes-count
+ (database-count-recent-changes database)))
+ (metric-set recent-changes-count-metric recent-changes-count)
+ (log-msg 'DEBUG recent-changes-count " recent changes in the database"))
+
+ (log-msg 'DEBUG "starting to listen for recent changes")
+ (let ((after-initial
+ (database-select-latest-recent-change-datetime database)))
+ (let loop ((after after-initial)
+ (last-processed-recent-changes
+ (database-select-recent-changes database after-initial)))
+ (sleep 10)
+
+ (match
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception in recent change listener " exn)
+ #f)
+ (lambda ()
+ (let* ((recent-changes
+ (database-select-recent-changes database after))
+ (unprocessed-recent-changes
+ (remove
+ (lambda (change-details)
+ (member change-details last-processed-recent-changes))
+ recent-changes)))
+
+ (unless (null? unprocessed-recent-changes)
+ (log-msg 'INFO "processing " (length unprocessed-recent-changes)
+ " recent changes")
+
+ (metric-increment recent-changes-count-metric
+ #:by (length unprocessed-recent-changes))
+
+ (for-each
+ (lambda (change-details)
+ (let ((change (assq-ref change-details 'change)))
+ (cond
+ ((string=? change "addition")
+ (process-addition-change change-details))
+ ((string=? change "removal")
+ (process-removal-change change-details))
+ (else #f))))
+ unprocessed-recent-changes))
+ ;; Use the unprocessed recent changes here to carry
+ ;; forward all processed changes to the next pass
+ unprocessed-recent-changes))
+ #:unwind? #t)
+ (#f (loop after '()))
+ (recent-changes
+ (if (null? recent-changes)
+ (loop after last-processed-recent-changes)
+ (loop (assq-ref (last recent-changes)
+ 'datetime)
+ recent-changes)))))))))
diff --git a/nar-herder/server.scm b/nar-herder/server.scm
index 522ff3f..a714fb3 100644
--- a/nar-herder/server.scm
+++ b/nar-herder/server.scm
@@ -19,39 +19,65 @@
(define-module (nar-herder server)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-11)
+ #:use-module (srfi srfi-19)
#:use-module (srfi srfi-34)
+ #:use-module (srfi srfi-71)
+ #:use-module (ice-9 ftw)
+ #:use-module (ice-9 iconv)
#:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
#:use-module (ice-9 binary-ports)
#:use-module (rnrs bytevectors)
#:use-module (web uri)
+ #:use-module (web client)
#:use-module (web response)
#:use-module (web request)
#:use-module (logging logger)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers scheduler)
+ #:use-module (fibers conditions)
+ #:use-module (fibers operations)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((system foreign)
#:select (bytevector->pointer pointer->bytevector))
#:use-module (guix store)
#:use-module (guix base32)
+ #:use-module (guix progress)
#:use-module (guix serialization)
#:use-module ((guix utils)
#:select (decompressed-port))
#:use-module ((guix build utils)
#:select (dump-port))
+ #:use-module ((guix build syscalls) #:select (set-thread-name))
#:use-module (nar-herder database)
#:use-module (nar-herder storage)
+ #:use-module (nar-herder utils)
+ #:use-module (nar-herder mirror)
+ #:use-module (nar-herder recent-changes)
+ #:use-module (nar-herder cached-compression)
#:use-module (ice-9 textual-ports)
- #:export (make-request-handler))
+ #:export (%compression-options
+
+ run-nar-herder-service
+ make-request-handler))
+
+(define %compression-options
+ '(gzip lzip zstd none))
(define* (render-json json #:key (extra-headers '())
(code 200))
(values (build-response
#:code code
#:headers (append extra-headers
- '((content-type . (application/json))
+ '((content-type . (application/json
+ (charset . "utf-8")))
(vary . (accept)))))
- (lambda (port)
- (scm->json json port))))
+ (call-with-encoded-output-string
+ "utf-8"
+ (lambda (port)
+ (scm->json json port)))))
(define (parse-query-string query)
(let lp ((lst (map uri-decode (string-split query (char-set #\& #\=)))))
@@ -61,32 +87,6 @@
(("") '())
(() '()))))
-(define (get-gc-metrics-updater registry)
- (define metrics
- `((gc-time-taken
- . ,(make-gauge-metric registry "guile_gc_time_taken"))
- (heap-size
- . ,(make-gauge-metric registry "guile_heap_size"))
- (heap-free-size
- . ,(make-gauge-metric registry "guile_heap_free_size"))
- (heap-total-allocated
- . ,(make-gauge-metric registry "guile_heap_total_allocated"))
- (heap-allocated-since-gc
- . ,(make-gauge-metric registry "guile_allocated_since_gc"))
- (protected-objects
- . ,(make-gauge-metric registry "guile_gc_protected_objects"))
- (gc-times
- . ,(make-gauge-metric registry "guile_gc_times"))))
-
- (lambda ()
- (let ((stats (gc-stats)))
- (for-each
- (match-lambda
- ((name . metric)
- (let ((value (assq-ref stats name)))
- (metric-set metric value))))
- metrics))))
-
(define (serve-fixed-output-file input compression proc)
;; TODO It's hard with fold-archive from (guix serialization) to
;; read just the singular file from the archive, so the following
@@ -121,10 +121,11 @@
(define (read-string p)
(utf8->string (read-byte-string p)))
- (let*-values (((port pids)
- (decompressed-port
- (string->symbol compression)
- input)))
+ (let ((port
+ pids
+ (decompressed-port
+ (string->symbol compression)
+ input)))
;; The decompressor can be an external program, so wait for it to
;; exit
@@ -147,26 +148,78 @@
(proc port size)))))
+(define (add-cached-compressions-to-narinfo initial-narinfo-contents
+ cached-narinfo-files)
+ (let ((cached-nar-strings
+ (map (lambda (cached-nar-details)
+ (let ((compression
+ (symbol->string
+ (assq-ref cached-nar-details 'compression))))
+ (string-append
+ "URL: nar/" compression "/"
+ (uri-encode
+ (store-path-base
+ (assq-ref cached-nar-details 'store-path)))
+ "\n"
+ "Compression: " compression "\n"
+ "FileSize: " (number->string
+ (assq-ref cached-nar-details 'size))
+ "\n")))
+ cached-narinfo-files)))
+ (string-append
+ initial-narinfo-contents
+ (string-join
+ cached-nar-strings
+ "\n"))))
+
(define* (make-request-handler database storage-root
- #:key ttl negative-ttl logger
- metrics-registry)
+ #:key base-ttl base-cached-compressions-ttl
+ negative-ttl logger
+ metrics-registry
+ maybe-trigger-creation-of-cached-nars
+ cached-compression-nar-requested-hook)
+ (define hostname
+ (gethostname))
+
(define (narinfo? str)
(and
(= (string-length str) 40)
(string-suffix? ".narinfo" str)))
+ (define plain-metrics-registry
+ (make-metrics-registry))
+
(define gc-metrics-updater
- (get-gc-metrics-updater metrics-registry))
+ (get-gc-metrics-updater plain-metrics-registry))
+
+ (define process-metrics-updater
+ (get-process-metrics-updater plain-metrics-registry))
+
+ (define guile-time-metrics-updater
+ (let ((internal-real-time
+ (make-gauge-metric plain-metrics-registry "guile_internal_real_time"))
+ (internal-run-time
+ (make-gauge-metric plain-metrics-registry "guile_internal_run_time")))
+ (lambda ()
+ (metric-set internal-real-time
+ (get-internal-real-time))
+ (metric-set internal-run-time
+ (get-internal-run-time)))))
(define requests-total-metric
(make-counter-metric metrics-registry
"server_requests_total"))
- (define (increment-request-metric category response-code)
+ (define* (increment-request-metric category response-code #:key (labels '()))
(metric-increment
requests-total-metric
#:label-values `((category . ,category)
- (response_code . ,response-code))))
+ (response_code . ,response-code)
+ ,@labels)))
+
+ (define %compression-strings
+ (map symbol->string
+ %compression-options))
(lambda (request body)
(log-msg logger
@@ -178,30 +231,81 @@
(match (cons (request-method request)
(split-and-decode-uri-path
(uri-path (request-uri request))))
- (('GET (? narinfo? narinfo))
- (let ((narinfo-contents
+ (((or 'HEAD 'GET) (? narinfo? narinfo))
+ (let ((base-narinfo-contents
+ narinfo-id
(database-select-narinfo-contents-by-hash
database
(string-take narinfo 32))))
(increment-request-metric "narinfo"
- (if narinfo-contents
+ (if base-narinfo-contents
"200"
"404"))
- (if narinfo-contents
- (values `((content-type . (text/plain))
- ,@(if ttl
- `((cache-control (max-age . ,ttl)))
- '()))
- narinfo-contents)
+ (if base-narinfo-contents
+ (let* ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ narinfo-id))
+ (narinfo-contents
+ (if (null? cached-narinfo-files)
+ base-narinfo-contents
+ (add-cached-compressions-to-narinfo
+ base-narinfo-contents
+ cached-narinfo-files)))
+ (potential-ttls
+ (remove
+ not
+ `(,(if (null? cached-narinfo-files)
+ base-ttl
+ base-cached-compressions-ttl)
+
+ ,(and=> (database-select-scheduled-narinfo-removal
+ database
+ narinfo-id)
+ (lambda (scheduled-removal-time)
+ (list
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+
+ ,@(if (null? cached-narinfo-files)
+ '()
+ (map
+ (lambda (details)
+ (and=>
+ (database-select-scheduled-cached-narinfo-removal
+ database
+ (assq-ref details 'id))
+ (lambda (scheduled-removal-time)
+ (max
+ (- (time-second
+ (time-difference scheduled-removal-time
+ (current-time)))
+ 60)
+ 0))))
+ cached-narinfo-files)))))
+ (ttl
+ (cond
+ ((null? potential-ttls) #f)
+ (else (apply min potential-ttls)))))
+
+ (values `((content-type . (text/plain))
+ ,@(if ttl
+ `((cache-control (max-age . ,ttl)))
+ '()))
+ narinfo-contents))
(values (build-response #:code 404
#:headers (if negative-ttl
`((cache-control
(max-age . ,negative-ttl)))
'()))
"404"))))
- (('GET (? narinfo? narinfo) "info")
+ (((or 'HEAD 'GET) (? narinfo? narinfo) "info")
(let ((narinfo-contents
(database-select-narinfo-contents-by-hash
database
@@ -220,42 +324,133 @@
(string-take narinfo 32)))))
(values (build-response #:code 404)
"404"))))
- (('GET "nar" compression filename)
- (let* ((hash (string-take filename 32))
+ (((or 'HEAD 'GET) "nar" compression filename)
+ (let* ((hash (and (>= (string-length filename) 32)
+ (string-take filename 32)))
+ (narinfo
+ (and hash
+ (database-select-narinfo-by-hash
+ database
+ hash)))
(narinfo-files
- (database-select-narinfo-files
- database
- hash))
+ (and=> (assq-ref narinfo 'id)
+ (lambda (id)
+ (database-select-narinfo-files-by-narinfo-id
+ database
+ id))))
(narinfo-file-for-compression
(find (lambda (file)
- (string=? (assq-ref file 'compression)
- compression))
- narinfo-files)))
-
- (when (or narinfo-file-for-compression
- ;; Check for a common compression to avoid lots of
- ;; metrics being generated if compression is random
- (member compression '("gzip" "lzip" "zstd")))
- (increment-request-metric
- (string-append "nar/"
- compression)
- (if narinfo-file-for-compression "200" "404")))
+ (and (string=? (assq-ref file 'compression)
+ compression)
+ (string=?
+ (last (string-split (assq-ref file 'url)
+ #\/))
+ (uri-encode filename))))
+ (or narinfo-files '())))
+ (compression-symbol
+ (if (member
+ compression
+ %compression-strings
+ string=?)
+ (string->symbol compression)
+ #f)))
(if narinfo-file-for-compression
- (values (build-response
- #:code 200
- #:headers `((X-Accel-Redirect
- . ,(string-append
- "/internal/nar/"
- compression "/"
- (uri-encode filename)))))
- #f)
- (values (build-response #:code 404)
- "404"))))
- (('GET "file" name algo hash)
+ (let ((loop?
+ (any
+ (lambda (via)
+ (string=? (last (string-split via #\space))
+ hostname))
+ (request-via request))))
+
+ (when (and (not loop?)
+ maybe-trigger-creation-of-cached-nars)
+ (maybe-trigger-creation-of-cached-nars
+ (assq-ref narinfo 'id)))
+
+ (when loop?
+ (log-msg logger 'WARN
+ (request-method request)
+ " "
+ (uri-path (request-uri request))
+ ": loop detected (" hostname "): "
+ (string-join (request-via request) ", ")))
+
+ (increment-request-metric
+ (string-append "nar/"
+ compression)
+ (if loop?
+ "500"
+ "200")
+ #:labels
+ (let ((system (assq-ref narinfo 'system)))
+ (if (string? system)
+ `((system . ,system))
+ '())))
+
+ (if loop?
+ (values (build-response #:code 500)
+ (simple-format #f "loop detected (~A): ~A\n"
+ hostname
+ (request-via request)))
+ (values (build-response
+ #:code 200
+ #:headers `((X-Accel-Redirect
+ . ,(string-append
+ "/internal/nar/"
+ compression "/"
+ (uri-encode filename)))))
+ #f)))
+ (let ((cached-narinfo-file
+ (and narinfo ; must be a known hash
+ compression-symbol ; must be a known compression
+ ;; Check that the filename given in the
+ ;; request matches the narinfo store-path
+ (string=? filename
+ (basename
+ (assq-ref narinfo 'store-path)))
+ (database-select-cached-narinfo-file-by-hash
+ database
+ hash
+ compression-symbol))))
+
+ (when (or cached-narinfo-file
+ ;; Check for a common compression to avoid lots of
+ ;; metrics being generated if compression is random
+ compression-symbol)
+ (increment-request-metric
+ (string-append "nar/"
+ compression)
+ (if cached-narinfo-file "200" "404")
+ #:labels
+ (if cached-narinfo-file
+ (let ((system (assq-ref narinfo 'system)))
+ (if (string? system)
+ `((system . ,system))
+ '()))
+ '())))
+
+ (when cached-narinfo-file
+ (cached-compression-nar-requested-hook compression-symbol
+ filename))
+
+ (if cached-narinfo-file
+ (values (build-response
+ #:code 200
+ #:headers `((X-Accel-Redirect
+ . ,(string-append
+ "/internal/cached-nar/"
+ compression "/"
+ (uri-encode filename)))))
+ #f)
+ (values (build-response #:code 404)
+ "404"))))))
+ (((or 'HEAD 'GET) "file" name algo hash)
(guard (c ((invalid-base32-character? c)
(values (build-response #:code 404)
- "404")))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ "404"))))
(let ((hash-bytevector (nix-base32-string->bytevector hash)))
(if (and (string=? algo "sha256") (= 32 (bytevector-length
hash-bytevector)))
@@ -272,43 +467,54 @@
store-path-hash))
(selected-narinfo-file
;; TODO Select intelligently
- (first narinfo-files)))
+ (if (null? narinfo-files)
+ #f
+ (first narinfo-files)))
+ (filename
+ (and selected-narinfo-file
+ (let ((filename
+ (string-append
+ storage-root
+ (uri-decode
+ (assq-ref selected-narinfo-file 'url)))))
+ (and (file-exists? filename)
+ filename)))))
(increment-request-metric
"file"
- (if selected-narinfo-file "200" "404"))
-
- (if selected-narinfo-file
- (let* ((url
- (assq-ref selected-narinfo-file 'url))
- (filename
- (string-append storage-root
- (uri-decode url))))
-
- (serve-fixed-output-file
- (open-input-file filename)
- (assq-ref selected-narinfo-file
- 'compression)
- (lambda (nar-port bytes)
- (values `((content-type . (application/octet-stream
- (charset . "ISO-8859-1")))
- (content-length . ,bytes))
- (lambda (output-port)
- (dump-port nar-port
- output-port
- bytes)
-
- (close-port output-port))))))
+ (if filename "200" "404"))
+
+ (if filename
+ (serve-fixed-output-file
+ (open-input-file filename)
+ (assq-ref selected-narinfo-file
+ 'compression)
+ (lambda (nar-port bytes)
+ (values `((content-type . (application/octet-stream
+ (charset . "ISO-8859-1")))
+ (content-length . ,bytes))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ (lambda (output-port)
+ (dump-port nar-port
+ output-port
+ bytes)
+
+ (close-port nar-port))))))
(values (build-response #:code 404)
- "404")))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ "404"))))
(begin
(increment-request-metric "file" "404")
(values (build-response #:code 404)
- "404"))))))
+ (if (eq? (request-method request) 'HEAD)
+ #f
+ "404")))))))
- (('GET "recent-changes")
+ (((or 'HEAD 'GET) "recent-changes")
(let ((query-parameters
(or (and=> (uri-query (request-uri request))
parse-query-string)
@@ -323,7 +529,7 @@
(or
(assoc-ref query-parameters "since")
"1970-01-01 00:00:01"))))))))
- (('GET "latest-database-dump")
+ (((or 'HEAD 'GET) "latest-database-dump")
(increment-request-metric "latest-database-dump" "200")
@@ -331,20 +537,378 @@
#:code 200
#:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db")))
#f))
- (('GET "metrics")
+ (((or 'HEAD 'GET) "metrics")
(gc-metrics-updater)
+ (process-metrics-updater)
+ (guile-time-metrics-updater)
+ (update-database-metrics! database)
(increment-request-metric "metrics" "200")
(values (build-response
#:code 200
#:headers '((content-type . (text/plain))
(vary . (accept))))
- (lambda (port)
- (write-metrics metrics-registry
- port))))
+ (call-with-output-string
+ (lambda (port)
+ (write-metrics metrics-registry port)
+ (write-metrics plain-metrics-registry port)))))
(_
(increment-request-metric "unhandled" "404")
(values (build-response #:code 404)
"404")))))
+(define* (run-nar-herder-service opts lgr)
+ (define (download-database)
+ (let ((database-uri
+ (string->uri
+ (string-append (assq-ref opts 'mirror)
+ "/latest-database-dump"))))
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (simple-format (current-error-port)
+ "starting downloading the database\n")
+ (let ((port
+ socket
+ (open-socket-for-uri* database-uri)))
+ (http-get database-uri
+ #:port port
+ #:streaming? #t)))
+ (lambda (response body)
+ (when (not (= (response-code response) 200))
+ (error "unable to fetch database from mirror"))
+
+ (let* ((reporter (progress-reporter/file
+ (uri->string database-uri)
+ (response-content-length response)
+ (current-error-port)))
+ (port
+ (progress-report-port
+ reporter
+ body
+ #:download-size (response-content-length response))))
+
+ (call-with-output-file (assq-ref opts 'database)
+ (lambda (output-port)
+ (dump-port port output-port)))
+
+ (close-port port))
+
+ (simple-format (current-error-port)
+ "finished downloading the database\n"))))
+ #:timeout 30)))
+
+ (define metrics-registry
+ (make-metrics-registry
+ #:namespace
+ "narherder"))
+
+ (and=>
+ (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (let ((database-file (assq-ref opts 'database)))
+ (if (file-exists? database-file)
+ (begin
+ ;; TODO Open the database, and check if the
+ ;; latest changes in the database are visible on
+ ;; the source to mirror. If they're not, then
+ ;; delete the database and download it to get
+ ;; back in sync
+
+ #f)
+ (download-database)))))
+
+ ;; Used elsewhere
+ (make-gauge-metric metrics-registry "recent_changes_count")
+
+ (let ((recent-changes-metric
+ (make-gauge-metric metrics-registry "recent_changes_limit")))
+ (metric-set recent-changes-metric (assq-ref opts 'recent-changes-limit)))
+
+ (define maintenance-scheduler
+ (make-scheduler #:parallelism 1))
+
+ (let* ((database (setup-database (assq-ref opts 'database)
+ metrics-registry
+ #:reader-threads
+ (assq-ref opts 'database-reader-threads)))
+ (canonical-storage (and=> (assq-ref opts 'storage)
+ canonicalize-path))
+
+ (enabled-cached-compressions
+ (let ((explicit-cached-compression-directories
+ (filter-map
+ (match-lambda
+ (('cached-compression-directory . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-directories-max-sizes
+ (filter-map
+ (match-lambda
+ (('cached-compression-directory-max-size . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-new-ttls
+ (filter-map
+ (match-lambda
+ (('cached-compression-new-ttl . details) details)
+ (_ #f))
+ opts))
+ (cached-compression-unused-removal-durations
+ (filter-map
+ (match-lambda
+ (('cached-compression-unused-removal-duration . details)
+ details)
+ (_ #f))
+ opts)))
+
+ (filter-map
+ (match-lambda
+ (('cached-compression . details)
+ (let ((compression
+ (assq-ref details 'type)))
+ (cons compression
+ `(,@(alist-delete 'type details)
+ (directory
+ . ,(or (assq-ref explicit-cached-compression-directories
+ compression)
+ (simple-format #f "/var/cache/nar-herder/nar/~A"
+ compression)))
+ (directory-max-size
+ . ,(assq-ref cached-compression-directories-max-sizes
+ compression))
+ (ttl
+ . ,(assq-ref cached-compression-ttls
+ compression))
+ (new-ttl
+ . ,(assq-ref cached-compression-new-ttls
+ compression))
+ (unused-removal-duration
+ . ,(assq-ref cached-compression-unused-removal-durations
+ compression))))))
+ (_ #f))
+ opts)))
+
+ (cached-compression-min-uses
+ (assq-ref opts 'cached-compression-min-uses))
+
+ (cached-compression-management-channel
+ (if (null? enabled-cached-compressions)
+ #f
+ (start-cached-compression-management-fiber
+ database
+ metrics-registry
+ (or (assq-ref opts 'cached-compression-nar-source)
+ canonical-storage)
+ enabled-cached-compressions
+ cached-compression-min-uses
+ #:cached-compression-workers
+ (assq-ref opts 'cached-compression-workers)
+ #:scheduler maintenance-scheduler)))
+
+ (maybe-trigger-creation-of-cached-nars
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (narinfo-id)
+ (spawn-fiber
+ (lambda ()
+ (put-message cached-compression-management-channel
+ (cons 'narinfo-id narinfo-id)))
+ maintenance-scheduler))))
+
+ (cached-compression-nar-requested-hook
+ (if (null? enabled-cached-compressions)
+ #f
+ (lambda (compression filename)
+ (spawn-fiber
+ (lambda ()
+ (let* ((directory
+ (assq-ref (assq-ref enabled-cached-compressions
+ compression)
+ 'directory)))
+ (utime (string-append directory "/" filename))))
+ maintenance-scheduler)))))
+
+ (if (string=? (assq-ref opts 'database-dump)
+ "disabled")
+ (log-msg 'INFO "database dump disabled")
+ (when (not (file-exists? (assq-ref opts 'database-dump)))
+ (log-msg 'INFO "dumping database...")
+ (dump-database database (assq-ref opts 'database-dump))))
+
+ (let ((finished? (make-condition)))
+ (call-with-new-thread
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name "maintenance"))
+ (const #t))
+
+ (run-fibers
+ (lambda ()
+ (when canonical-storage
+ (initialise-storage-metrics
+ database
+ canonical-storage
+ metrics-registry))
+
+ (start-recent-change-removal-and-database-dump-fiber
+ database
+ metrics-registry
+ (let ((filename (assq-ref opts 'database-dump)))
+ (if (string=? filename "disabled")
+ #f
+ filename))
+ (* 24 3600) ; 24 hours
+ (assq-ref opts 'recent-changes-limit))
+
+ (let ((mirror-channel
+ (and=>
+ (assq-ref opts 'mirror)
+ (lambda (mirror)
+ (start-fetch-changes-fiber
+ database
+ metrics-registry
+ canonical-storage ; might be #f, but that's fine here
+ mirror
+ cached-compression-management-channel)
+
+ (if (assq-ref opts 'storage)
+ (start-mirroring-fiber database
+ mirror
+ (assq-ref opts 'storage-limit)
+ canonical-storage
+ metrics-registry)
+ #f))))
+ (removal-channel
+ (let ((nar-removal-criteria
+ (filter-map
+ (match-lambda
+ ((key . val)
+ (if (eq? key 'storage-nar-removal-criteria)
+ val
+ #f)))
+ opts)))
+ (if (and (assq-ref opts 'storage)
+ (number? (assq-ref opts 'storage-limit))
+ (not (null? nar-removal-criteria)))
+ (start-nar-removal-fiber database
+ canonical-storage
+ (assq-ref opts 'storage-limit)
+ metrics-registry
+ nar-removal-criteria)
+ #f)))
+ (addition-channel (make-channel)))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "exception processing addition-channel: "
+ exn))
+ (lambda ()
+ (match (get-message addition-channel)
+ (('addition file)
+ (when mirror-channel
+ (put-message mirror-channel
+ `(fetch ,file)))
+ (when removal-channel
+ (spawn-fiber
+ (lambda ()
+ (sleep 60)
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)
+ (sleep (* 5 60))
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)
+ (sleep (* 15 60))
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)
+ (sleep 3600)
+ (removal-channel-remove-nar-from-storage removal-channel
+ file)))))))
+ #:unwind? #t))))
+
+ (start-recent-change-listener-fiber
+ database
+ metrics-registry
+ addition-channel
+ removal-channel))
+
+ (unless (null? enabled-cached-compressions)
+ (let ((cached-compression-removal-fiber-wakeup-channel
+ (start-cached-compression-removal-fiber
+ database
+ cached-compression-management-channel
+ enabled-cached-compressions)))
+ (start-cached-compression-schedule-removal-fiber
+ database
+ metrics-registry
+ cached-compression-management-channel
+ enabled-cached-compressions
+ cached-compression-removal-fiber-wakeup-channel
+ (or (assq-ref opts 'narinfo-ttl)
+ ;; Default from (guix substitutes)
+ (* 36 3600)))))
+
+ (log-msg 'DEBUG "finished maintenance setup")
+ (wait finished?))
+ #:scheduler maintenance-scheduler
+ #:hz 0
+ #:parallelism 1)))
+
+ (call-with-sigint
+ (lambda ()
+ (run-fibers
+ (lambda ()
+ (let* ((current (current-scheduler))
+ (schedulers
+ (cons current (scheduler-remote-peers current))))
+ (for-each
+ (lambda (i sched)
+ (spawn-fiber
+ (lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append "fibers " (number->string i))))
+ (const #t)))
+ sched))
+ (iota (length schedulers))
+ schedulers))
+
+ (log-msg 'INFO "starting server, listening on "
+ (assq-ref opts 'host) ":" (assq-ref opts 'port))
+
+ (run-server/patched
+ (make-request-handler
+ database
+ canonical-storage
+ #:base-ttl (or (assq-ref opts 'new-narinfo-ttl)
+ (assq-ref opts 'narinfo-ttl))
+ #:base-cached-compressions-ttl
+ (or (assq-ref opts 'new-cached-compressions-narinfo-ttl)
+ (assq-ref opts 'cached-compressions-narinfo-ttl))
+ #:negative-ttl (assq-ref opts 'narinfo-negative-ttl)
+ #:logger lgr
+ #:metrics-registry metrics-registry
+ #:maybe-trigger-creation-of-cached-nars
+ maybe-trigger-creation-of-cached-nars
+ #:cached-compression-nar-requested-hook
+ cached-compression-nar-requested-hook)
+ #:host (assq-ref opts 'host)
+ #:port (assq-ref opts 'port))
+
+ (wait finished?))
+ #:hz 0
+ #:parallelism (assq-ref opts 'parallelism)))
+ finished?))))
diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm
index c017685..fc49b2d 100644
--- a/nar-herder/storage.scm
+++ b/nar-herder/storage.scm
@@ -18,6 +18,7 @@
(define-module (nar-herder storage)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
@@ -25,21 +26,27 @@
#:use-module (web uri)
#:use-module (web client)
#:use-module (web response)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (logging logger)
#:use-module (logging port-log)
#:use-module (prometheus)
#:use-module (json)
#:use-module ((guix build utils) #:select (dump-port mkdir-p))
#:use-module ((guix store) #:select (store-path-hash-part))
+ #:use-module (guix progress)
#:use-module (nar-herder utils)
#:use-module (nar-herder database)
#:export (store-item-in-local-storage?
remove-nar-files-by-hash
- get-nar-files
+ initialise-storage-metrics
+ check-storage
- start-nar-removal-thread
- start-mirroring-thread))
+ removal-channel-remove-nar-from-storage
+
+ start-nar-removal-fiber
+ start-mirroring-fiber))
(define (store-item-in-local-storage? database storage-root hash)
(let ((narinfo-files (database-select-narinfo-files database hash)))
@@ -52,17 +59,12 @@
(assq-ref file 'url)))))
narinfo-files)))
-(define (remove-nar-files-by-hash database storage-root metrics-registry
- hash)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
+(define* (remove-nar-files-by-hash database storage-root metrics-registry
+ hash
+ #:key (error-unless-files-to-remove? #t))
(let ((narinfo-files (database-select-narinfo-files database hash)))
- (when (null? narinfo-files)
+ (when (and (null? narinfo-files)
+ error-unless-files-to-remove?)
(error "no narinfo files"))
(for-each
(lambda (file)
@@ -76,9 +78,14 @@
(remove-nar-from-storage storage-root
(assq-ref file 'url)))
- (metric-decrement nar-files-metric
- #:label-values
- `((stored . ,(if exists? "true" "false"))))))
+ (and=> (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (lambda (metric)
+ ;; Just update this metric if it exists, since if it
+ ;; does, it should be set to a value
+ (metric-decrement
+ metric
+ #:label-values `((stored . ,(if exists? "true" "false"))))))))
narinfo-files)))
(define (get-storage-size storage-root)
@@ -159,50 +166,223 @@
(unrecognised-files . ,(hash-map->list (lambda (key _) key)
files-hash)))))
-(define* (get-nar-files database storage-root metrics-registry
- #:key stored?)
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
- (let* ((index (index-storage database storage-root))
- (selected-files
- (filter
- (lambda (file)
- (eq? (assq-ref file 'stored?) stored?))
- (assq-ref index 'narinfo-files))))
-
- (let ((selected-files-count
- (length selected-files))
- (all-files-count
- (length (assq-ref index 'narinfo-files))))
-
- (metric-set nar-files-metric
- selected-files-count
- #:label-values `((stored . ,(if stored? "true" "false"))))
- (metric-set nar-files-metric
- (- all-files-count selected-files-count)
- #:label-values `((stored . ,(if stored? "false" "true")))))
-
- selected-files))
-
-(define (start-nar-removal-thread database
- storage-root storage-limit
- metrics-registry
- nar-removal-criteria)
+;; TODO Maybe remove the metrics-registry argument?
+(define* (fold-nar-files database storage-root metrics-registry
+ proc init
+ #:key stored?)
+ (define stored-files-count 0)
+ (define not-stored-files-count 0)
+
+ (let ((result
+ (database-fold-all-narinfo-files
+ database
+ (lambda (nar result)
+ (let* ((url
+ (uri-decode
+ (assq-ref nar 'url)))
+ (nar-stored?
+ (file-exists?
+ (string-append storage-root url))))
+
+ (if nar-stored?
+ (set! stored-files-count (1+ stored-files-count))
+ (set! not-stored-files-count (1+ not-stored-files-count)))
+
+ (if (or (eq? stored? 'both)
+ (and stored? nar-stored?)
+ (and (not stored?)
+ (not nar-stored?)))
+ (proc nar result)
+ result)))
+ init)))
+
+ (values result
+ `((stored . ,stored-files-count)
+ (not-stored . ,not-stored-files-count)))))
+
+(define* (update-nar-files-metric metrics-registry
+ nar-file-counts
+ #:key fetched-count removed-count)
+
+ ;; Avoid incrementing or decrementing the metric if it hasn't been
+ ;; set yet
+ (when (or (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (= (length nar-file-counts) 2))
+
+ (let ((nar-files-metric
+ (or (metrics-registry-fetch-metric metrics-registry
+ "nar_files_total")
+ (make-gauge-metric metrics-registry
+ "nar_files_total"
+ #:labels '(stored)))))
+
+ ;; Set the values if the counts are known
+ (and=>
+ (assq-ref nar-file-counts 'stored)
+ (lambda (stored-count)
+ (metric-set nar-files-metric
+ stored-count
+ #:label-values '((stored . "true")))))
+ (and=>
+ (assq-ref nar-file-counts 'not-stored)
+ (lambda (not-stored-count)
+ (metric-set nar-files-metric
+ not-stored-count
+ #:label-values '((stored . "false")))))
+
+ ;; Then adjust by the fetched or removed counts
+ (when fetched-count
+ (metric-increment nar-files-metric
+ #:by fetched-count
+ #:label-values '((stored . "true")))
+ (metric-decrement nar-files-metric
+ #:by fetched-count
+ #:label-values '((stored . "false"))))
+ (when removed-count
+ (metric-decrement nar-files-metric
+ #:by removed-count
+ #:label-values '((stored . "true")))
+ (metric-increment nar-files-metric
+ #:by removed-count
+ #:label-values '((stored . "false")))))))
+
+(define (initialise-storage-metrics database storage-root metrics-registry)
+ ;; Use a database transaction to block changes
+ (database-call-with-transaction
+ database
+ (lambda _
+ (log-msg 'INFO "starting to initialise storage metrics")
+ (let ((_
+ counts
+ (fold-nar-files
+ database
+ storage-root
+ metrics-registry
+ (const #f)
+ #f
+ #:stored? 'both)))
+ (update-nar-files-metric
+ metrics-registry
+ counts))
+ (log-msg 'INFO "finished initialising storage metrics"))))
+
+(define (check-storage database storage-root metrics-registry)
+ (define files-count
+ (database-count-narinfo-files database))
+
+ (call-with-progress-reporter
+ (progress-reporter/bar files-count
+ (simple-format #f "checking ~A files" files-count)
+ (current-error-port))
+ (lambda (report)
+ (fold-nar-files
+ database
+ storage-root
+ metrics-registry
+ (lambda (file _)
+ (let* ((full-filename
+ (string-append storage-root
+ (uri-decode (assq-ref file 'url))))
+ (file-size
+ (stat:size (stat full-filename)))
+ (database-size
+ (assq-ref file 'size)))
+ (report)
+ (unless (= file-size database-size)
+ (newline)
+ (log-msg 'WARN "file " full-filename
+ " has inconsistent size (database: "
+ database-size ", file: " file-size ")"))
+ #f))
+ #f
+ #:stored? 'both))))
+
+(define (at-most max-length lst)
+ "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise
+return its MAX-LENGTH first elements and its tail."
+ (let loop ((len 0)
+ (lst lst)
+ (result '()))
+ (match lst
+ (()
+ (values (reverse result) '()))
+ ((head . tail)
+ (if (>= len max-length)
+ (values (reverse result) lst)
+ (loop (+ 1 len) tail (cons head result)))))))
+
+(define %max-cached-connections
+ ;; Maximum number of connections kept in cache by
+ ;; 'open-connection-for-uri/cached'.
+ 16)
+
+(define open-socket-for-uri/cached
+ (let ((cache '()))
+ (lambda* (uri #:key fresh? verify-certificate?)
+ "Return a connection for URI, possibly reusing a cached connection.
+When FRESH? is true, delete any cached connections for URI and open a new one.
+Return #f if URI's scheme is 'file' or #f.
+
+When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates."
+ (define host (uri-host uri))
+ (define scheme (uri-scheme uri))
+ (define key (list host scheme (uri-port uri)))
+
+ (and (not (memq scheme '(file #f)))
+ (match (assoc-ref cache key)
+ (#f
+ ;; Open a new connection to URI and evict old entries from
+ ;; CACHE, if any.
+ (let ((socket
+ (open-socket-for-uri*
+ uri
+ #:verify-certificate? verify-certificate?))
+ (new-cache evicted
+ (at-most (- %max-cached-connections 1) cache)))
+ (for-each (match-lambda
+ ((_ . port)
+ (false-if-exception (close-port port))))
+ evicted)
+ (set! cache (alist-cons key socket new-cache))
+ socket))
+ (socket
+ (if (or fresh? (port-closed? socket))
+ (begin
+ (false-if-exception (close-port socket))
+ (set! cache (alist-delete key cache))
+ (open-socket-for-uri/cached uri
+ #:verify-certificate?
+ verify-certificate?))
+ (begin
+ ;; Drain input left from the previous use.
+ (drain-input socket)
+ socket))))))))
+
+(define (call-with-cached-connection uri proc)
+ (let ((port (open-socket-for-uri/cached uri)))
+ (with-throw-handler #t
+ (lambda ()
+ (proc port))
+ (lambda _
+ (close-port port)))))
+
+(define (removal-channel-remove-nar-from-storage
+ channel file)
+ (let ((reply (make-channel)))
+ (put-message channel (list 'remove-from-storage reply file))
+ (get-message reply)))
+
+(define (start-nar-removal-fiber database
+ storage-root storage-limit
+ metrics-registry
+ nar-removal-criteria)
(define storage-size-metric
(make-gauge-metric metrics-registry
"storage_size_bytes"))
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
+ (define removal-channel
+ (make-channel))
(define (check-removal-criteria nar criteria)
(define narinfo
@@ -222,21 +402,29 @@
(store-path-hash-part
(assq-ref narinfo 'store-path))
".narinfo/info"))))
- (call-with-values
- (lambda ()
- (retry-on-error
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
(lambda ()
- (http-get uri #:decode-body? #f))
- #:times 3
- #:delay 5))
- (lambda (response body)
- (and (= (response-code response)
- 200)
-
- (let ((json-body (json-string->scm
- (utf8->string body))))
- (eq? (assoc-ref json-body "stored")
- #t)))))))))
+ (retry-on-error
+ (lambda ()
+ (call-with-cached-connection uri
+ (lambda (port)
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:keep-alive? #t
+ #:streaming? #t))))
+ #:times 3
+ #:delay 5))
+ (lambda (response body)
+ (and (= (response-code response)
+ 200)
+
+ (let ((json-body (json->scm body)))
+ (eq? (assoc-ref json-body "stored")
+ #t))))))
+ #:timeout 30)))))
(define (nar-can-be-removed? nar)
(any (lambda (criteria)
@@ -252,43 +440,98 @@
(metric-set storage-size-metric
initial-storage-size)
- (let loop ((storage-size
- initial-storage-size)
- (stored-nar-files
- (with-time-logging "getting stored nar files"
- (get-nar-files database storage-root metrics-registry
- #:stored? #t))))
- ;; Look through items in local storage, check if the removal
- ;; criteria have been met, and if so, delete it
-
- (unless (null? stored-nar-files)
- (let ((nar-to-consider (car stored-nar-files)))
- (if (nar-can-be-removed? nar-to-consider)
- (begin
- (remove-nar-from-storage
- storage-root
- (uri-decode
- (assq-ref nar-to-consider 'url)))
-
- (metric-decrement nar-files-metric
- #:label-values '((stored . "true")))
- (metric-increment nar-files-metric
- #:label-values '((stored . "false")))
-
- (let ((storage-size-estimate
- (- storage-size
- (assq-ref nar-to-consider 'size))))
- (when (> storage-size-estimate storage-limit)
- (loop storage-size-estimate
- (cdr stored-nar-files)))))
- (loop storage-size
- (cdr stored-nar-files)))))))
- (log-msg 'INFO "finished looking for nars to remove"))
+ ;; Look through items in local storage, check if the removal
+ ;; criteria have been met, and if so, delete it
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ metrics-registry
+ (lambda (nar result)
+ (match result
+ ((storage-size . removed-count)
+ (if (and (> storage-size storage-limit)
+ (nar-can-be-removed? nar))
+ (let ((response
+ (removal-channel-remove-nar-from-storage
+ removal-channel
+ (assq-ref nar 'url))))
+
+ (if (eq? response 'removed)
+ (let ((storage-size-estimate
+ (- storage-size
+ (assq-ref nar 'size))))
+ (cons storage-size-estimate
+ (+ removed-count 1)))
+ (cons storage-size
+ removed-count)))
+ (cons storage-size
+ removed-count)))))
+ (cons initial-storage-size 0)
+ #:stored? #t)))
+
+ (match result
+ ((storage-size . removed-count)
+
+ (log-msg 'INFO "finished looking for nars to remove, removed "
+ removed-count " files"))))))
(when (null? nar-removal-criteria)
(error "must be some removal criteria"))
- (call-with-new-thread
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (match (get-message removal-channel)
+ (('remove-from-storage reply file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "nar remove from storage failed ("
+ file "): " exn)
+ (put-message reply
+ (cons 'exn exn)))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (cond
+ ((not (file-exists?
+ (string-append storage-root
+ (uri-decode file))))
+ (put-message reply 'does-not-exist))
+ ((not (nar-can-be-removed?
+ `((url . ,file))))
+ (put-message reply
+ 'removal-criteria-not-met))
+ (else
+ (remove-nar-from-storage
+ storage-root
+ (uri-decode file))
+
+ (update-nar-files-metric
+ metrics-registry
+ '()
+ #:removed-count 1)
+
+ (put-message reply 'removed))))
+ (lambda _
+ (backtrace))))
+ #:unwind? #t))
+ (('remove file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to remove " file ": " exn))
+ (lambda ()
+ ;; TODO: Do more checking at this point
+ (remove-nar-from-storage
+ storage-root
+ (uri-decode file))
+ (update-nar-files-metric metrics-registry
+ '()
+ #:removed-count 1))
+ #:unwind? #t))))))
+
+ (spawn-fiber
(lambda ()
(while #t
(with-exception-handler
@@ -296,11 +539,12 @@
(log-msg 'ERROR "nar removal pass failed " exn))
run-removal-pass
#:unwind? #t)
+ (sleep (* 60 60 24)))))
- (sleep 300)))))
+ removal-channel)
-(define (start-mirroring-thread database mirror storage-limit storage-root
- metrics-registry)
+(define (start-mirroring-fiber database mirror storage-limit storage-root
+ metrics-registry)
(define no-storage-limit?
(not (integer? storage-limit)))
@@ -309,13 +553,6 @@
(make-gauge-metric metrics-registry
"storage_size_bytes"))
- (define nar-files-metric
- (or (metrics-registry-fetch-metric metrics-registry
- "nar_files_total")
- (make-gauge-metric metrics-registry
- "nar_files_total"
- #:labels '(stored))))
-
(define (fetch-file file)
(let* ((string-url
(string-append mirror file))
@@ -333,83 +570,149 @@
(when (file-exists? tmp-file-name)
(delete-file tmp-file-name))
- (call-with-values
- (lambda ()
- (http-get uri
- #:decode-body? #f
- #:streaming? #t))
- (lambda (response body)
- (unless (= (response-code response)
- 200)
- (error "unknown response code"))
-
- (call-with-output-file tmp-file-name
- (lambda (output-port)
- (dump-port body output-port)))
- (rename-file tmp-file-name
- destination-file-name)
-
- (metric-increment nar-files-metric
- #:label-values '((stored . "true")))
- (metric-decrement nar-files-metric
- #:label-values '((stored . "false")))))))
+ (with-exception-handler
+ (lambda (exn)
+ (when (file-exists? tmp-file-name)
+ (delete-file tmp-file-name))
+
+ (raise-exception exn))
+ (lambda ()
+ (with-port-timeouts
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (let ((port
+ socket
+ (open-socket-for-uri* uri)))
+ (http-get uri
+ #:port port
+ #:decode-body? #f
+ #:streaming? #t)))
+ (lambda (response body)
+ (unless (= (response-code response)
+ 200)
+ (error "unknown response code"
+ (response-code response)))
+
+ (call-with-output-file tmp-file-name
+ (lambda (output-port)
+ (dump-port body output-port))))))
+ #:timeout 30))
+ #:unwind? #t)
+
+ (rename-file tmp-file-name
+ destination-file-name)
+
+ (update-nar-files-metric
+ metrics-registry
+ '()
+ #:fetched-count 1)))
(define (download-nars initial-storage-size)
;; If there's free space, then consider downloading missing nars
- (when (< initial-storage-size storage-limit)
- (let loop ((storage-size initial-storage-size)
- (missing-nar-files (get-nar-files
- database storage-root metrics-registry
- #:stored? #f)))
- (unless (null? missing-nar-files)
- (let ((file (car missing-nar-files)))
- (log-msg 'DEBUG "considering "
- (assq-ref file 'url))
- (let ((file-bytes (assq-ref file 'size)))
- (if (or no-storage-limit?
- (< (+ storage-size file-bytes)
- storage-limit))
- (let ((success?
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "failed to fetch "
- (assq-ref file 'url)
- ": " exn)
- #f)
- (lambda ()
- (fetch-file (assq-ref file 'url))
- #t)
- #:unwind? #t)))
- (loop (if success?
- (+ storage-size file-bytes)
- storage-size)
- (cdr missing-nar-files)))
- ;; This file won't fit, so try the next one
- (loop storage-size
- (cdr missing-nar-files)))))))))
+ (if (< initial-storage-size storage-limit)
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ metrics-registry
+ (lambda (file result)
+ (log-msg 'DEBUG "considering "
+ (assq-ref file 'url))
+ (match result
+ ((storage-size . fetched-count)
+ (let ((file-bytes (assq-ref file 'size)))
+ (if (or no-storage-limit?
+ (< (+ storage-size file-bytes)
+ storage-limit))
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to fetch "
+ (assq-ref file 'url)
+ ": " exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (fetch-file (assq-ref file 'url)))
+ #:times 3
+ #:delay 5))
+ (lambda _
+ (backtrace)))
+ #t)
+ #:unwind? #t)))
+ (if success?
+ (cons (+ storage-size file-bytes)
+ (1+ fetched-count))
+ result))
+ ;; This file won't fit, so try the next one
+ result)))))
+ initial-storage-size
+ #:stored? #f)))
+
+ (match result
+ ((storage-size . fetched-count)
+ fetched-count)))
+ 0))
(define (fast-download-nars)
(define parallelism 3)
- (let ((missing-nar-files (get-nar-files
- database storage-root metrics-registry
- #:stored? #f)))
- (n-par-for-each
- parallelism
- (lambda (file)
- (log-msg 'DEBUG "considering "
- (assq-ref file 'url))
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "failed to fetch "
- (assq-ref file 'url)
- ": " exn)
- #f)
- (lambda ()
- (fetch-file (assq-ref file 'url))
- #t)
- #:unwind? #t))
- missing-nar-files)))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (spawn-fiber
+ (lambda ()
+ (let loop ((fetched-count 0))
+ (match (get-message channel)
+ (('finished . reply)
+ (put-message reply fetched-count))
+ (url
+ (log-msg 'DEBUG "considering " url)
+ (loop
+ (+ fetched-count
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to fetch " url ": " exn)
+ 0)
+ (lambda ()
+ (retry-on-error
+ (lambda ()
+ (fetch-file url))
+ #:times 3
+ #:delay 5)
+ 1)
+ #:unwind? #t)))))))))
+ (iota parallelism))
+
+ (let ((result
+ nar-file-counts
+ (fold-nar-files
+ database
+ storage-root
+ metrics-registry
+ (lambda (nar _)
+ (put-message channel
+ (assq-ref nar 'url))
+ #f)
+ #f
+ #:stored? #f)))
+
+ (let* ((reply-channel (make-channel))
+ (fetched-count
+ (apply
+ +
+ (map
+ (lambda _
+ (put-message channel
+ (cons 'finished reply-channel))
+ (get-message reply-channel))
+ (iota parallelism)))))
+ fetched-count))))
(define (run-mirror-pass)
(log-msg 'DEBUG "running mirror pass")
@@ -417,18 +720,38 @@
(get-storage-size storage-root))))
(metric-set storage-size-metric
initial-storage-size)
- (if no-storage-limit?
- (fast-download-nars)
- (download-nars initial-storage-size)))
- (log-msg 'DEBUG "finished mirror pass"))
-
- (call-with-new-thread
- (lambda ()
- (while #t
- (with-exception-handler
- (lambda (exn)
- (log-msg 'ERROR "mirror pass failed " exn))
- run-mirror-pass
- #:unwind? #t)
-
- (sleep 300)))))
+ (let ((fetched-count
+ (if no-storage-limit?
+ (fast-download-nars)
+ (download-nars initial-storage-size))))
+ (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)"))))
+
+ (let ((channel (make-channel)))
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (match (get-message channel)
+ ('full-pass
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "mirror pass failed " exn))
+ run-mirror-pass
+ #:unwind? #t))
+ (('fetch file)
+ (with-exception-handler
+ (lambda (exn)
+ (log-msg 'ERROR "failed to mirror " file ": " exn))
+ (lambda ()
+ (fetch-file file)
+ (update-nar-files-metric metrics-registry
+ '()
+ #:fetched-count 1))
+ #:unwind? #t))))))
+
+ (spawn-fiber
+ (lambda ()
+ (while #t
+ (put-message channel 'full-pass)
+ (sleep (* 60 60 24)))))
+
+ channel))
diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm
index 2d62360..4755d33 100644
--- a/nar-herder/utils.scm
+++ b/nar-herder/utils.scm
@@ -18,44 +18,37 @@
(define-module (nar-herder utils)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9)
#:use-module (srfi srfi-19) ; time
#:use-module (ice-9 q)
- ;; #:use-module (ice-9 ftw)
- ;; #:use-module (ice-9 popen)
#:use-module (ice-9 iconv)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 atomic)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 exceptions)
#:use-module (rnrs bytevectors)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module ((ice-9 ports internal) #:select (port-poll
+ port-read-wait-fd
+ port-write-wait-fd))
#:use-module (web uri)
#:use-module (web http)
#:use-module (web client)
#:use-module (web request)
#:use-module (web response)
#:use-module (fibers)
+ #:use-module (fibers timers)
#:use-module (fibers channels)
+ #:use-module (fibers scheduler)
#:use-module (fibers conditions)
- ;; #:use-module (gcrypt pk-crypto)
- ;; #:use-module (gcrypt hash)
- ;; #:use-module (gcrypt random)
- ;; #:use-module (json)
- ;; #:use-module (guix pki)
- ;; #:use-module (guix utils)
- ;; #:use-module (guix config)
- ;; #:use-module (guix store)
- ;; #:use-module (guix status)
- ;; #:use-module (guix base64)
- ;; #:use-module (guix scripts substitute)
- #:export (call-with-streaming-http-request
- &chunked-input-ended-prematurely
- chunked-input-ended-prematurely-error?
- make-chunked-input-port*
-
- make-worker-thread-channel
+ #:use-module (fibers operations)
+ #:use-module ((guix build syscalls)
+ #:select (set-thread-name))
+ #:export (make-worker-thread-set
call-with-worker-thread
call-with-time-logging
@@ -65,171 +58,20 @@
create-work-queue
- check-locale!))
-
-;; Chunked Responses
-(define (read-chunk-header port)
- "Read a chunk header from PORT and return the size in bytes of the
-upcoming chunk."
- (match (read-line port)
- ((? eof-object?)
- ;; Connection closed prematurely: there's nothing left to read.
- (error "chunked input ended prematurely"))
- (str
- (let ((extension-start (string-index str
- (lambda (c)
- (or (char=? c #\;)
- (char=? c #\return))))))
- (string->number (if extension-start ; unnecessary?
- (substring str 0 extension-start)
- str)
- 16)))))
-
-(define &chunked-input-ended-prematurely
- (make-exception-type '&chunked-input-error-prematurely
- &external-error
- '()))
+ check-locale!
-(define make-chunked-input-ended-prematurely-error
- (record-constructor &chunked-input-ended-prematurely))
-
-(define chunked-input-ended-prematurely-error?
- (record-predicate &chunked-input-ended-prematurely))
-
-(define* (make-chunked-input-port* port #:key (keep-alive? #f))
- (define (close)
- (unless keep-alive?
- (close-port port)))
-
- (define chunk-size 0) ;size of the current chunk
- (define remaining 0) ;number of bytes left from the current chunk
- (define finished? #f) ;did we get all the chunks?
-
- (define (read! bv idx to-read)
- (define (loop to-read num-read)
- (cond ((or finished? (zero? to-read))
- num-read)
- ((zero? remaining) ;get a new chunk
- (let ((size (read-chunk-header port)))
- (set! chunk-size size)
- (set! remaining size)
- (cond
- ((zero? size)
- (set! finished? #t)
- (get-bytevector-n port 2) ; \r\n follows the last chunk
- num-read)
- (else
- (loop to-read num-read)))))
- (else ;read from the current chunk
- (let* ((ask-for (min to-read remaining))
- (read (get-bytevector-n! port bv (+ idx num-read)
- ask-for)))
- (cond
- ((eof-object? read) ;premature termination
- (raise-exception
- (make-chunked-input-ended-prematurely-error)))
- (else
- (let ((left (- remaining read)))
- (set! remaining left)
- (when (zero? left)
- ;; We're done with this chunk; read CR and LF.
- (get-u8 port) (get-u8 port))
- (loop (- to-read read)
- (+ num-read read)))))))))
- (loop to-read 0))
-
- (make-custom-binary-input-port "chunked input port" read! #f #f close))
-
-(define* (make-chunked-output-port* port #:key (keep-alive? #f)
- (buffering 1200)
- report-bytes-sent)
- (define heap-allocated-limit
- (expt 2 20)) ;; 1MiB
-
- (define (%put-string s)
- (unless (string-null? s)
- (let* ((bv (string->bytevector s "ISO-8859-1"))
- (length (bytevector-length bv)))
- (put-string port (number->string length 16))
- (put-string port "\r\n")
- (put-bytevector port bv)
- (put-string port "\r\n")
-
- (when report-bytes-sent
- (report-bytes-sent length))
- (let* ((stats (gc-stats))
- (initial-gc-times
- (assq-ref stats 'gc-times)))
- (when (> (assq-ref stats 'heap-allocated-since-gc)
- heap-allocated-limit)
- (while (let ((updated-stats (gc-stats)))
- (= (assq-ref updated-stats 'gc-times)
- initial-gc-times))
- (gc)
- (usleep 50)))))))
-
- (define (%put-char c)
- (%put-string (list->string (list c))))
-
- (define (flush) #t)
- (define (close)
- (put-string port "0\r\n\r\n")
- (force-output port)
- (unless keep-alive?
- (close-port port)))
- (let ((ret (make-soft-port
- (vector %put-char %put-string flush #f close) "w")))
- (setvbuf ret 'block buffering)
- ret))
-
-(define* (call-with-streaming-http-request uri callback
- #:key (headers '())
- (method 'PUT)
- report-bytes-sent)
- (let* ((port (open-socket-for-uri uri))
- (request
- (build-request
- uri
- #:method method
- #:version '(1 . 1)
- #:headers `((connection close)
- (Transfer-Encoding . "chunked")
- (Content-Type . "application/octet-stream")
- ,@headers)
- #:port port)))
-
- (set-port-encoding! port "ISO-8859-1")
- (setvbuf port 'block (expt 2 13))
- (with-exception-handler
- (lambda (exp)
- (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp)
- (close-port port)
- (raise-exception exp))
- (lambda ()
- (let ((request (write-request request port)))
- (let* ((chunked-output-port
- (make-chunked-output-port*
- port
- #:buffering (expt 2 12)
- #:keep-alive? #t
- #:report-bytes-sent report-bytes-sent)))
-
- ;; A SIGPIPE will kill Guile, so ignore it
- (sigaction SIGPIPE
- (lambda (arg)
- (simple-format (current-error-port) "warning: SIGPIPE\n")))
-
- (set-port-encoding! chunked-output-port "ISO-8859-1")
- (callback chunked-output-port)
- (close-port chunked-output-port)
-
- (let ((response (read-response port)))
- (let ((body (read-response-body response)))
- (close-port port)
- (values response
- body)))))))))
-
-(define* (retry-on-error f #:key times delay ignore)
+ open-socket-for-uri*
+
+ call-with-sigint
+ run-server/patched
+
+ timeout-error?
+
+ port-read-timeout-error?
+ port-write-timeout-error?
+ with-port-timeouts))
+
+(define* (retry-on-error f #:key times delay ignore error-hook)
(let loop ((attempt 1))
(match (with-exception-handler
(lambda (exn)
@@ -259,15 +101,26 @@ upcoming chunk."
times))
(apply values return-values))
((#f . exn)
- (if (>= attempt times)
+ (if (>= attempt
+ (- times 1))
(begin
(simple-format
(current-error-port)
- "error: ~A:\n ~A,\n giving up after ~A attempts\n"
+ "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n"
f
exn
- times)
- (raise-exception exn))
+ attempt
+ times
+ delay)
+ (when error-hook
+ (error-hook attempt exn))
+ (sleep delay)
+ (simple-format
+ (current-error-port)
+ "running last retry of ~A after ~A failed attempts\n"
+ f
+ attempt)
+ (f))
(begin
(simple-format
(current-error-port)
@@ -277,71 +130,11 @@ upcoming chunk."
attempt
times
delay)
+ (when error-hook
+ (error-hook attempt exn))
(sleep delay)
(loop (+ 1 attempt))))))))
-(define delay-logging-fluid
- (make-thread-local-fluid))
-(define delay-logging-depth-fluid
- (make-thread-local-fluid 0))
-
-(define (log-delay proc duration)
- (and=> (fluid-ref delay-logging-fluid)
- (lambda (recorder)
- (recorder proc duration))))
-
-(define* (call-with-delay-logging proc #:key (threshold 1) (args '()))
- (let ((start (get-internal-real-time))
- (trace '())
- (root-logger? (eq? #f (fluid-ref delay-logging-fluid))))
-
- (define (format-seconds seconds)
- (format #f "~4f" seconds))
-
- (call-with-values
- (lambda ()
- (with-fluid* delay-logging-depth-fluid
- (+ 1 (fluid-ref delay-logging-depth-fluid))
- (lambda ()
- (if root-logger?
- (with-fluid* delay-logging-fluid
- (lambda (proc duration)
- (set! trace
- (cons (list proc
- duration
- (fluid-ref delay-logging-depth-fluid))
- trace))
- #t)
- (lambda ()
- (apply proc args)))
- (apply proc args)))))
- (lambda vals
- (let ((elapsed-seconds
- (/ (- (get-internal-real-time)
- start)
- internal-time-units-per-second)))
- (if (and (> elapsed-seconds threshold)
- root-logger?)
- (let ((lines
- (cons
- (simple-format #f "warning: delay of ~A seconds: ~A"
- (format-seconds elapsed-seconds)
- proc)
- (map (match-lambda
- ((proc duration depth)
- (string-append
- (make-string (* 2 depth) #\space)
- (simple-format #f "~A: ~A"
- (format-seconds duration)
- proc))))
- trace))))
- (display (string-append
- (string-join lines "\n")
- "\n")))
- (unless root-logger?
- ((fluid-ref delay-logging-fluid) proc elapsed-seconds))))
- (apply values vals)))))
-
(define (call-with-time-logging name thunk)
(let ((start (current-time time-utc)))
(call-with-values
@@ -364,7 +157,9 @@ upcoming chunk."
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
- (make-time time-duration 0 0)))
+ (make-time time-duration 0 0))
+ (name "unnamed")
+ priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
@@ -384,11 +179,26 @@ upcoming chunk."
(else
thread-count-parameter)))
- (define (process-job . args)
- (with-mutex queue-mutex
- (enq! queue args)
- (start-new-threads-if-necessary (get-thread-count))
- (signal-condition-variable job-available)))
+ (define process-job
+ (if priority<?
+ (lambda* (args #:key priority)
+ (with-mutex queue-mutex
+ (enq! queue (cons priority args))
+ (set-car!
+ queue
+ (stable-sort! (car queue)
+ (lambda (a b)
+ (priority<?
+ (car a)
+ (car b)))))
+ (sync-q! queue)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))
+ (lambda args
+ (with-mutex queue-mutex
+ (enq! queue args)
+ (start-new-threads-if-necessary (get-thread-count))
+ (signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
@@ -403,11 +213,12 @@ upcoming chunk."
(define (list-jobs)
(with-mutex queue-mutex
- (append (list-copy
- (car queue))
+ (append (if priority<?
+ (map cdr (car queue))
+ (list-copy (car queue)))
(hash-fold (lambda (key val result)
- (or (and val
- (cons val result))
+ (if val
+ (cons val result)
result))
'()
running-job-args))))
@@ -416,16 +227,17 @@ upcoming chunk."
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
- "job raised exception: ~A\n"
- job-args))
+ "~A work queue, job raised exception ~A: ~A\n"
+ name job-args exn))
(lambda ()
(with-throw-handler #t
(lambda ()
(apply proc job-args))
(lambda (key . args)
- (simple-format (current-error-port)
- "exception when handling job: ~A ~A\n"
- key args)
+ (simple-format
+ (current-error-port)
+ "~A work queue, exception when handling job: ~A ~A\n"
+ name key args)
(backtrace))))
#:unwind? #t))
@@ -453,6 +265,13 @@ upcoming chunk."
(call-with-new-thread
(lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t "
+ (number->string thread-index))))
+ (const #t))
+
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
@@ -469,9 +288,13 @@ upcoming chunk."
;; the job in the mean time
(if (q-empty? queue)
#f
- (deq! queue))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))
#f)
- (deq! queue))))
+ (if priority<?
+ (cdr (deq! queue))
+ (deq! queue)))))
(if job-args
(begin
@@ -499,32 +322,38 @@ upcoming chunk."
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
- (let* ((thread-count
- (hash-count (const #t) running-job-args))
- (threads-to-start
- (- desired-count thread-count)))
- (when (> threads-to-start 0)
- (for-each
- (lambda (thread-index)
- (when (eq? (hash-ref running-job-args
- thread-index
- 'slot-free)
- 'slot-free)
- (let* ((now (current-time time-monotonic))
- (elapsed (time-difference now
- previous-thread-started-at)))
- (when (or (eq? #f thread-start-delay)
- (time>=? elapsed thread-start-delay))
- (set! previous-thread-started-at now)
- (hash-set! running-job-args
- thread-index
- #f)
- (start-thread thread-index)))))
- (iota desired-count)))))))
+ (let* ((thread-count
+ (hash-count (const #t) running-job-args))
+ (threads-to-start
+ (- desired-count thread-count)))
+ (when (> threads-to-start 0)
+ (for-each
+ (lambda (thread-index)
+ (when (eq? (hash-ref running-job-args
+ thread-index
+ 'slot-free)
+ 'slot-free)
+ (let* ((now (current-time time-monotonic))
+ (elapsed (time-difference now
+ previous-thread-started-at)))
+ (when (or (eq? #f thread-start-delay)
+ (time>=? elapsed thread-start-delay))
+ (set! previous-thread-started-at now)
+ (hash-set! running-job-args
+ thread-index
+ #f)
+ (start-thread thread-index)))))
+ (iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append name " q t")))
+ (const #t))
+
(while #t
(sleep 15)
(with-mutex queue-mutex
@@ -565,83 +394,171 @@ falling back to en_US.utf8\n"
(setlocale LC_ALL ""))
#:unwind? #t))
-(define %worker-thread-args
- (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- destructor
- lifetime
- (log-exception? (const #t)))
- "Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+(define-record-type <worker-thread-set>
+ (worker-thread-set channel arguments-parameter)
+ worker-thread-set?
+ (channel worker-thread-set-channel)
+ (arguments-parameter worker-thread-set-arguments-parameter))
+
+(define* (make-worker-thread-set initializer
+ #:key (parallelism 1)
+ (delay-logger (lambda _ #f))
+ (duration-logger (const #f))
+ destructor
+ lifetime
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
+ (define param
+ (make-parameter #f))
+
+ (define (initializer/safe)
+ (let ((args
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ initializer
+ (lambda args
+ (backtrace))))
+ #:unwind? #t)))
+
+ (if args
+ args
+ ;; never give up, just keep retrying
+ (begin
+ (sleep 5)
+ (initializer/safe)))))
+
+ (define (destructor/safe args)
+ (let ((success?
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ name
+ initializer
+ exn)
+ #f)
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (apply destructor args)
+ #t)
+ (lambda _
+ (backtrace))))
+ #:unwind? #t)))
+
+ (or success?
+ #t
+ (begin
+ (sleep 5)
+ (destructor/safe args)))))
+
(let ((channel (make-channel)))
(for-each
- (lambda _
+ (lambda (thread-index)
(call-with-new-thread
(lambda ()
- (let init ((args (initializer)))
- (parameterize ((%worker-thread-args args))
+ (catch 'system-error
+ (lambda ()
+ (set-thread-name
+ (string-append
+ name " w t "
+ (number->string thread-index))))
+ (const #t))
+
+ (let init ((args (initializer/safe)))
+ (parameterize ((param args))
(let loop ((current-lifetime lifetime))
- (match (get-message channel)
- (((? channel? reply) sent-time (? procedure? proc))
- (let ((time-delay
- (- (get-internal-real-time)
- sent-time)))
- (delay-logger (/ time-delay
- internal-time-units-per-second))
- (put-message
- reply
- (let ((start-time (get-internal-real-time)))
- (with-exception-handler
- (lambda (exn)
- (list 'worker-thread-error
- (/ (- (get-internal-real-time)
- start-time)
- internal-time-units-per-second)
- exn))
- (lambda ()
- (with-throw-handler #t
- (lambda ()
- (call-with-values
- (lambda ()
- (apply proc args))
- (lambda vals
- (cons (/ (- (get-internal-real-time)
- start-time)
+ (let ((exception?
+ (match (get-message channel)
+ (((? channel? reply) sent-time (? procedure? proc))
+ (let ((time-delay
+ (- (get-internal-real-time)
+ sent-time)))
+ (delay-logger (/ time-delay
internal-time-units-per-second)
- vals))))
- (lambda args
- (when (match args
- (('%exception exn)
- (log-exception? exn))
- (_ #t))
- (simple-format
- (current-error-port)
- "worker-thread: exception: ~A\n" args)
- (backtrace)))))
- #:unwind? #t))))))
- (if (number? current-lifetime)
- (unless (< current-lifetime 0)
- (loop (if current-lifetime
- (- current-lifetime 1)
- #f)))
- (loop #f))))
+ proc)
+
+ (let* ((start-time (get-internal-real-time))
+ (response
+ (with-exception-handler
+ (lambda (exn)
+ (list 'worker-thread-error
+ (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ exn))
+ (lambda ()
+ (with-throw-handler #t
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply proc args))
+ (lambda vals
+ (cons (/ (- (get-internal-real-time)
+ start-time)
+ internal-time-units-per-second)
+ vals))))
+ (lambda args
+ (when (match args
+ (('%exception exn)
+ (log-exception? exn))
+ (_ #t))
+ (simple-format
+ (current-error-port)
+ "worker-thread: exception: ~A\n" args)
+ (backtrace)))))
+ #:unwind? #t)))
+ (put-message reply
+ response)
+
+ (match response
+ (('worker-thread-error duration _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #t)
+ ((duration . _)
+ (when duration-logger
+ (duration-logger duration proc))
+ #f))))))))
+
+ (unless (and expire-on-exception?
+ exception?)
+ (if (number? current-lifetime)
+ (unless (< current-lifetime 0)
+ (loop (if current-lifetime
+ (- current-lifetime 1)
+ #f)))
+ (loop #f))))))
+
(when destructor
- (apply destructor args))
- (init (initializer))))))
+ (destructor/safe args))
+
+ (init (initializer/safe))))))
(iota parallelism))
- channel))
-(define* (call-with-worker-thread channel proc #:key duration-logger)
+ (worker-thread-set channel
+ param)))
+
+(define* (call-with-worker-thread record proc #:key duration-logger)
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
- (let ((args (%worker-thread-args)))
+ (let ((args ((worker-thread-set-arguments-parameter record))))
(if args
(apply proc args)
(let ((reply (make-channel)))
- (put-message channel (list reply (get-internal-real-time) proc))
+ (put-message (worker-thread-set-channel record)
+ (list reply (get-internal-real-time) proc))
(match (get-message reply)
(('worker-thread-error duration exn)
(when duration-logger
@@ -651,3 +568,221 @@ If already in the worker thread, call PROC immediately."
(when duration-logger
(duration-logger duration))
(apply values result)))))))
+
+(define* (open-socket-for-uri* uri
+ #:key (verify-certificate? #t))
+ (define tls-wrap
+ (@@ (web client) tls-wrap))
+
+ (define https?
+ (eq? 'https (uri-scheme uri)))
+
+ (define plain-uri
+ (if https?
+ (build-uri
+ 'http
+ #:userinfo (uri-userinfo uri)
+ #:host (uri-host uri)
+ #:port (or (uri-port uri) 443)
+ #:path (uri-path uri)
+ #:query (uri-query uri)
+ #:fragment (uri-fragment uri))
+ uri))
+
+ (let ((s (open-socket-for-uri plain-uri)))
+ (values
+ (if https?
+ (let ((port
+ (tls-wrap s (uri-host uri)
+ #:verify-certificate? verify-certificate?)))
+ ;; Guile/guile-gnutls don't handle the handshake happening on a non
+ ;; blocking socket, so change the behavior here.
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags)))
+ port)
+ (let ((flags (fcntl s F_GETFL)))
+ (fcntl s F_SETFL (logior O_NONBLOCK flags))
+ s))
+ s)))
+
+;; Copied from (fibers web server)
+(define (call-with-sigint thunk cvar)
+ (let ((handler #f))
+ (dynamic-wind
+ (lambda ()
+ (set! handler
+ (sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
+ thunk
+ (lambda ()
+ (if handler
+ ;; restore Scheme handler, SIG_IGN or SIG_DFL.
+ (sigaction SIGINT (car handler) (cdr handler))
+ ;; restore original C handler.
+ (sigaction SIGINT #f))))))
+
+;; This variant of run-server from the fibers library supports running
+;; multiple servers within one process.
+(define run-server/patched
+ (let ((fibers-web-server-module
+ (resolve-module '(fibers web server))))
+
+ (define set-nonblocking!
+ (module-ref fibers-web-server-module 'set-nonblocking!))
+
+ (define make-default-socket
+ (module-ref fibers-web-server-module 'make-default-socket))
+
+ (define socket-loop
+ (module-ref fibers-web-server-module 'socket-loop))
+
+ (lambda* (handler
+ #:key
+ (host #f)
+ (family AF_INET)
+ (addr (if host
+ (inet-pton family host)
+ INADDR_LOOPBACK))
+ (port 8080)
+ (socket (make-default-socket family addr port)))
+ ;; We use a large backlog by default. If the server is suddenly hit
+ ;; with a number of connections on a small backlog, clients won't
+ ;; receive confirmation for their SYN, leading them to retry --
+ ;; probably successfully, but with a large latency.
+ (listen socket 1024)
+ (set-nonblocking! socket)
+ (sigaction SIGPIPE SIG_IGN)
+ (spawn-fiber (lambda () (socket-loop socket handler))))))
+
+;; These procedure are subject to spurious wakeups.
+
+(define (readable? port)
+ "Test if PORT is writable."
+ (match (select (vector port) #() #() 0)
+ ((#() #() #()) #f)
+ ((#(_) #() #()) #t)))
+
+(define (writable? port)
+ "Test if PORT is writable."
+ (match (select #() (vector port) #() 0)
+ ((#() #() #()) #f)
+ ((#() #(_) #()) #t)))
+
+(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
+ (make-base-operation #f
+ (lambda _
+ (and (ready? (port-ready-fd port)) values))
+ (lambda (flag sched resume)
+ (define (commit)
+ (match (atomic-box-compare-and-swap! flag 'W 'S)
+ ('W (resume values))
+ ('C (commit))
+ ('S #f)))
+ (schedule-when-ready
+ sched (port-ready-fd port) commit))))
+
+(define (wait-until-port-readable-operation port)
+ "Make an operation that will succeed when PORT is readable."
+ (unless (input-port? port)
+ (error "refusing to wait forever for input on non-input port"))
+ (make-wait-operation readable? schedule-task-when-fd-readable port
+ port-read-wait-fd
+ wait-until-port-readable-operation))
+
+(define (wait-until-port-writable-operation port)
+ "Make an operation that will succeed when PORT is writable."
+ (unless (output-port? port)
+ (error "refusing to wait forever for output on non-output port"))
+ (make-wait-operation writable? schedule-task-when-fd-writable port
+ port-write-wait-fd
+ wait-until-port-writable-operation))
+
+
+
+(define &port-timeout
+ (make-exception-type '&port-timeout
+ &external-error
+ '(port)))
+
+(define make-port-timeout-error
+ (record-constructor &port-timeout))
+
+(define port-timeout-error?
+ (record-predicate &port-timeout))
+
+(define &port-read-timeout
+ (make-exception-type '&port-read-timeout
+ &port-timeout
+ '()))
+
+(define make-port-read-timeout-error
+ (record-constructor &port-read-timeout))
+
+(define port-read-timeout-error?
+ (record-predicate &port-read-timeout))
+
+(define &port-write-timeout
+ (make-exception-type '&port-write-timeout
+ &port-timeout
+ '()))
+
+(define make-port-write-timeout-error
+ (record-constructor &port-write-timeout))
+
+(define port-write-timeout-error?
+ (record-predicate &port-write-timeout))
+
+(define* (with-port-timeouts thunk
+ #:key timeout
+ (read-timeout timeout)
+ (write-timeout timeout))
+ (define (no-fibers-wait port mode timeout)
+ (define poll-timeout-ms 200)
+
+ ;; When the GC runs, it restarts the poll syscall, but the timeout
+ ;; remains unchanged! When the timeout is longer than the time
+ ;; between the syscall restarting, I think this renders the
+ ;; timeout useless. Therefore, this code uses a short timeout, and
+ ;; repeatedly calls poll while watching the clock to see if it has
+ ;; timed out overall.
+ (let ((timeout-internal
+ (+ (get-internal-real-time)
+ (* internal-time-units-per-second
+ (/ timeout 1000)))))
+ (let loop ((poll-value
+ (port-poll port mode poll-timeout-ms)))
+ (if (= poll-value 0)
+ (if (> (get-internal-real-time)
+ timeout-internal)
+ (raise-exception
+ (if (string=? mode "r")
+ (make-port-read-timeout-error port)
+ (make-port-write-timeout-error port)))
+ (loop (port-poll port mode poll-timeout-ms)))
+ poll-value))))
+
+ (parameterize
+ ((current-read-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation
+ (sleep-operation read-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-read-timeout-error thunk port))))))
+ (no-fibers-wait port "r" read-timeout))))
+ (current-write-waiter
+ (lambda (port)
+ (if (current-scheduler)
+ (perform-operation
+ (choice-operation
+ (wait-until-port-writable-operation port)
+ (wrap-operation
+ (sleep-operation write-timeout)
+ (lambda ()
+ (raise-exception
+ (make-port-write-timeout-error thunk port))))))
+ (no-fibers-wait port "w" write-timeout)))))
+ (thunk)))
diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in
index d1f95d1..fafcf9f 100644
--- a/scripts/nar-herder.in
+++ b/scripts/nar-herder.in
@@ -22,6 +22,11 @@
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
+(when (and (string-prefix? "aarch64-" %host-type)
+ (not (getenv "GUILE_JIT_THRESHOLD")))
+ (setenv "GUILE_JIT_THRESHOLD" "-1")
+ (apply execlp (car (command-line)) (command-line)))
+
(setvbuf (current-output-port) 'line)
(setvbuf (current-error-port) 'line)
@@ -36,9 +41,12 @@
(srfi srfi-19)
(srfi srfi-37)
(srfi srfi-43)
+ (srfi srfi-71)
(ice-9 ftw)
(ice-9 match)
(ice-9 format)
+ (ice-9 threads)
+ (ice-9 suspendable-ports)
(web uri)
(web client)
(web response)
@@ -47,21 +55,20 @@
(logging port-log)
(prometheus)
(fibers)
- (fibers conditions)
- (fibers web server)
((guix ui) #:select (read/eval string->duration))
- (guix progress)
+ (guix store)
(guix narinfo)
+ (guix progress)
(guix derivations)
((guix store) #:select (store-path-hash-part))
((guix build utils) #:select (dump-port))
(nar-herder utils)
(nar-herder database)
- (nar-herder recent-changes)
(nar-herder storage)
- (nar-herder mirror)
(nar-herder server))
+(install-suspendable-ports!)
+
(define %valid-log-levels
'(DEBUG INFO WARN ERROR))
@@ -80,12 +87,26 @@
(lambda (opt name arg result)
(alist-cons 'storage
arg
- (alist-delete 'storage result))))))
+ (alist-delete 'storage result))))
+
+ (option '("log-level") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'log-level
+ (let ((level (string->symbol (string-upcase arg))))
+ (if (member level %valid-log-levels)
+ level
+ (error
+ (simple-format #f "unknown log level ~A\nvalid levels are: ~A\n"
+ level
+ %valid-log-levels))))
+ (alist-delete 'log-level result))))))
(define %base-option-defaults
;; Alist of default option values
`((database . ,(string-append (getcwd) "/nar_herder.db"))
- (database-dump . ,(string-append (getcwd) "/nar_herder_dump.db"))))
+ (database-dump . ,(string-append (getcwd) "/nar_herder_dump.db"))
+
+ (log-level . DEBUG)))
(define %import-options
(list (option '("tag") #t #f
@@ -96,7 +117,10 @@
(cons key value)))
(or (assq-ref result 'tags)
'()))
- (alist-delete 'tags result))))))
+ (alist-delete 'tags result))))
+ (option '("ensure-references-exist") #f #f
+ (lambda (opt name _ result)
+ (alist-cons 'ensure-references-exist #t result)))))
(define %import-options-defaults
'())
@@ -136,6 +160,111 @@
(call-with-input-string rest read))))
result)))
+ (option '("enable-cached-compression") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression
+ (match (string-split arg #\:)
+ ((type)
+ `((type . ,(string->symbol type))))
+ ((type level)
+ `((type . ,(string->symbol type))
+ (level . ,(string->number level)))))
+ result)))
+
+ (option '("cached-compression-directory") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-directory
+ (match (string-split arg #\=)
+ ((type directory)
+ (cons (string->symbol type)
+ (canonicalize-path directory))))
+ result)))
+
+ (option '("cached-compression-directory-max-size") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-directory-max-size
+ (match (string-split arg #\=)
+ ((type size)
+ (cons (string->symbol type)
+ (string->number size))))
+ result)))
+
+ (option '("cached-compression-min-uses") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-min-uses
+ (string->number arg)
+ (alist-delete 'cached-compression-min-uses
+ result))))
+
+ (option '("cached-compression-workers") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-workers
+ (string->number arg)
+ result)))
+
+ (option '("cached-compression-nar-source") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-nar-source
+ arg
+ (alist-delete 'cached-compression-nar-source
+ result))))
+ (option '("cached-compression-unused-removal-duration") #t #f
+ (lambda (opt name arg result)
+ (alist-cons
+ 'cached-compression-unused-removal-duration
+ (match (string-split arg #\=)
+ ((_)
+ (simple-format
+ (current-error-port)
+ "cached-compressions-unused-removal-duration: you must specify compression and value\n")
+ (exit 1))
+ ((type duration-string)
+ (cons (string->symbol type)
+ (let ((duration (string->duration duration-string)))
+ (unless duration
+ (simple-format
+ (current-error-port)
+ "~A: cached-compressions-unused-removal-duration: invalid duration\n"
+ arg)
+ (exit 1))
+
+ duration))))
+ result)))
+ (option '("cached-compressions-ttl") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'cached-compression-ttl
+ (match (string-split arg #\=)
+ ((_)
+ (simple-format
+ (current-error-port)
+ "cached-compressions-ttl: you must specify compression and value\n")
+ (exit 1))
+ ((type ttl-string)
+ (let ((duration (string->duration ttl-string)))
+ (unless duration
+ (simple-format (current-error-port)
+ "~A: invalid duration\n"
+ arg)
+ (exit 1))
+
+ (cons (string->symbol type)
+ (time-second duration)))))
+ result)))
+ (option '("cached-compressions-new-ttl") #t #f
+ (lambda (opt name arg result)
+ (let ((duration (string->duration arg)))
+ (unless duration
+ (simple-format (current-error-port)
+ "~A: invalid duration\n"
+ arg)
+ (exit 1))
+ (alist-cons 'cached-compression-new-ttl
+ (match (string-split arg #\=)
+ ((type size)
+ (cons (string->symbol type)
+ (time-second duration))))
+ result))))
+
(option '("ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
@@ -146,6 +275,16 @@
(exit 1))
(alist-cons 'narinfo-ttl (time-second duration)
result))))
+ (option '("new-ttl") #t #f
+ (lambda (opt name arg result)
+ (let ((duration (string->duration arg)))
+ (unless duration
+ (simple-format (current-error-port)
+ "~A: invalid duration\n"
+ arg)
+ (exit 1))
+ (alist-cons 'new-narinfo-ttl (time-second duration)
+ result))))
(option '("negative-ttl") #t #f
(lambda (opt name arg result)
(let ((duration (string->duration arg)))
@@ -169,26 +308,40 @@
arg
(alist-delete 'mirror result))))
- (option '("log-level") #t #f
+ (option '("parallelism") #t #f
(lambda (opt name arg result)
- (alist-cons 'log-level
- (let ((level (string->symbol (string-upcase arg))))
- (if (member level %valid-log-levels)
- level
- (error
- (simple-format #f "unknown log level ~A\nvalid levels are: ~A\n"
- level
- %valid-log-levels))))
- (alist-delete 'log-level result))))))
+ (alist-cons 'parallelism
+ (string->number arg)
+ (alist-delete 'parallelism result))))
+
+ (option '("database-reader-threads") #t #f
+ (lambda (opt name arg result)
+ (alist-cons 'database-reader-threads
+ (string->number arg)
+ (alist-delete 'database-reader-threads result))))))
(define %server-option-defaults
- '((port . 8080)
+ `((port . 8080)
(host . "0.0.0.0")
- (log-level . DEBUG)
-
(storage-limit . "none")
- (recent-changes-limit . 32768)))
+
+ (cached-compression-workers . 2)
+ (cached-compression-min-uses . 3)
+
+ (recent-changes-limit . 32768)
+
+ (database-reader-threads . ,(min (max (current-processor-count)
+ 2)
+ 64))
+
+ (parallelism . 1)))
+
+(define %check-options
+ (list))
+
+(define %check-option-defaults
+ '())
(define (parse-options options defaults args)
(args-fold
@@ -245,19 +398,52 @@
(call-with-progress-reporter progress
(lambda (report)
- (for-each (lambda (narinfo)
- (database-insert-narinfo
- database
- (call-with-input-file narinfo
- (lambda (port)
- ;; Set url to a dummy value as this doesn't
- ;; matter
- (read-narinfo port "https://narherderdummyvalue")))
- #:tags (or (assq-ref opts 'tags)
- '()))
-
- (report))
- narinfos))))))
+ (database-call-with-transaction
+ database
+ (lambda (db)
+ (let ((read-narinfos
+ (map
+ (lambda (narinfo-file)
+ (let ((narinfo
+ (call-with-input-file narinfo-file
+ (lambda (port)
+ ;; Set url to a dummy value as this doesn't
+ ;; matter
+ (read-narinfo port
+ "https://narherderdummyvalue")))))
+
+ (database-insert-narinfo
+ database
+ narinfo
+ #:tags (or (assq-ref opts 'tags)
+ '()))
+
+ (report)
+
+ narinfo))
+ narinfos)))
+
+ (when (assq-ref opts 'ensure-references-exist)
+ (for-each
+ (lambda (narinfo)
+ (let ((self-reference
+ (store-path-base
+ (narinfo-path narinfo))))
+ (for-each
+ (lambda (reference)
+ (when (and
+ (not
+ (string=? reference self-reference))
+ (not
+ (database-select-narinfo-by-hash
+ database
+ (string-take reference 32))))
+ (error
+ (simple-format (current-error-port)
+ "missing reference to ~A\n"
+ reference))))
+ (narinfo-references narinfo))))
+ read-narinfos))))))))))
(("remove" rest ...)
(let* ((opts (parse-options %base-options
%base-option-defaults
@@ -272,11 +458,14 @@
(port-log (make <port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
(format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str)))))
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args))))))
(add-handler! lgr port-log)
(open-log! lgr)
@@ -284,24 +473,91 @@
(for-each
(lambda (store-path)
- (log-msg 'INFO "removing " store-path)
-
- (if (assq-ref opts 'storage)
- (begin
- ;; Removing the files here isn't ideal, since the servers
- ;; metrics won't be updated until the next get-nar-files call,
- ;; but it avoids extra complexity in trying to have the server
- ;; delete the files.
- (remove-nar-files-by-hash
- database
- (assq-ref opts 'storage)
- metrics-registry
- (store-path-hash-part store-path)))
- (log-msg
- 'WARN "no --storage set, so just removing from the database"))
-
- (database-remove-narinfo database store-path))
+ (let ((narinfo-details
+ (database-select-narinfo-by-hash
+ database
+ (store-path-hash-part store-path))))
+
+ (if narinfo-details
+ (let ((cached-narinfo-files
+ (database-select-cached-narinfo-files-by-narinfo-id
+ database
+ (assq-ref narinfo-details 'id))))
+
+ (log-msg 'INFO "removing " store-path)
+
+ (if (assq-ref opts 'storage)
+ (begin
+ (remove-nar-files-by-hash
+ database
+ (assq-ref opts 'storage)
+ metrics-registry
+ (store-path-hash-part store-path)
+ #:error-unless-files-to-remove? #f))
+ (log-msg
+ 'WARN "no --storage set, so just removing from the database"))
+
+ (for-each
+ (lambda (cached-narinfo-details)
+ ;; It might not have been scheduled for
+ ;; removal, but remove any schedule that
+ ;; exists
+ (database-delete-scheduled-cached-narinfo-removal
+ database
+ (assq-ref cached-narinfo-details 'id))
+
+ ;; Remove all the database entries first, as
+ ;; that'll stop these files appearing in narinfos
+ (database-remove-cached-narinfo-file
+ database
+ narinfo-id
+ (symbol->string compression)))
+ cached-narinfo-files)
+
+ (database-remove-narinfo database store-path))
+ (log-msg 'WARN store-path " not found to remove"))))
(assq-ref opts 'arguments))))
+ (("check" rest ...)
+ (let* ((opts (parse-options (append %base-options
+ %check-options)
+ (append %base-option-defaults
+ %check-option-defaults)
+ rest))
+ (lgr (make <logger>))
+ (port-log (make <port-log>
+ #:port (current-output-port)
+ #:formatter
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
+ (format #f "~a (~5a): ~a~%"
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args)))))
+ (metrics-registry (make-metrics-registry
+ #:namespace
+ "narherder")))
+
+ (add-handler! lgr port-log)
+ (open-log! lgr)
+ (set-default-logger! lgr)
+
+ (let ((log-level (assq-ref opts 'log-level)))
+ (let loop ((levels %valid-log-levels))
+ (when (and (not (null? levels))
+ (not (eq? (car levels) log-level)))
+ (disable-log-level! lgr (car levels))
+ (loop (cdr levels)))))
+
+ (let* ((database (setup-database (assq-ref opts 'database)
+ metrics-registry))
+ (canonical-storage (and=> (assq-ref opts 'storage)
+ canonicalize-path)))
+
+ (check-storage database
+ canonical-storage
+ metrics-registry))))
(("run-server" rest ...)
(simple-format (current-error-port) "locale is ~A\n" (check-locale!))
@@ -317,37 +573,14 @@
(port-log (make <port-log>
#:port (current-output-port)
#:formatter
- (lambda (lvl time str)
+ ;; In guile-lib v0.2.8 onwards, the formatter is
+ ;; called with more arguments
+ (lambda args ; lvl, time, str
(format #f "~a (~5a): ~a~%"
- (strftime "%F %H:%M:%S" (localtime time))
- lvl
- str))))
- (metrics-registry (make-metrics-registry
- #:namespace
- "narherder")))
-
- (define (download-database)
- (let ((database-uri
- (string->uri
- (string-append (assq-ref opts 'mirror)
- "/latest-database-dump"))))
- (call-with-values
- (lambda ()
- (simple-format (current-error-port)
- "starting downloading the database\n")
- (http-get database-uri
- #:decode-body? #f
- #:streaming? #t))
- (lambda (response body)
- (when (not (= (response-code response) 200))
- (error "unable to fetch database from mirror"))
-
- (call-with-output-file (assq-ref opts 'database)
- (lambda (output-port)
- (dump-port body output-port)))
-
- (simple-format (current-error-port)
- "finished downloading the database\n")))))
+ (strftime "%F %H:%M:%S" (localtime
+ (second args)))
+ (first args)
+ (third args))))))
(add-handler! lgr port-log)
(open-log! lgr)
@@ -378,73 +611,4 @@
(lambda (port)
(simple-format port "~A\n" (getpid))))))
- (and=>
- (assq-ref opts 'mirror)
- (lambda (mirror)
- (let ((database-file (assq-ref opts 'database)))
- (if (file-exists? database-file)
- (begin
- ;; TODO Open the database, and check if the
- ;; latest changes in the database are visible on
- ;; the source to mirror. If they're not, then
- ;; delete the database and download it to get
- ;; back in sync
-
- #f)
- (download-database)))))
-
- (let ((database (setup-database (assq-ref opts 'database)
- metrics-registry))
- (canonical-storage (and=> (assq-ref opts 'storage)
- canonicalize-path)))
-
- (when (not (file-exists? (assq-ref opts 'database-dump)))
- (log-msg 'INFO "dumping database...")
- (dump-database database (assq-ref opts 'database-dump)))
-
- (start-recent-change-removal-and-database-dump-thread
- database
- (assq-ref opts 'database-dump)
- (* 24 3600) ; 24 hours
- (assq-ref opts 'recent-changes-limit))
-
- (and=> (assq-ref opts 'mirror)
- (lambda (mirror)
- (start-fetch-changes-thread database
- canonical-storage
- mirror
- metrics-registry)
-
- (when (assq-ref opts 'storage)
- (start-mirroring-thread database
- mirror
- (assq-ref opts 'storage-limit)
- canonical-storage
- metrics-registry))))
-
-
- (when (and (assq-ref opts 'storage)
- (number? (assq-ref opts 'storage-limit)))
- (start-nar-removal-thread database
- canonical-storage
- (assq-ref opts 'storage-limit)
- metrics-registry
- (filter-map
- (match-lambda
- ((key . val)
- (if (eq? key 'storage-nar-removal-criteria)
- val
- #f)))
- opts)))
-
- (log-msg 'INFO "starting server, listening on "
- (assq-ref opts 'host) ":" (assq-ref opts 'port))
- (run-server
- (make-request-handler database
- canonical-storage
- #:ttl (assq-ref opts 'narinfo-ttl)
- #:negative-ttl (assq-ref opts 'narinfo-negative-ttl)
- #:logger lgr
- #:metrics-registry metrics-registry)
- #:host (assq-ref opts 'host)
- #:port (assq-ref opts 'port))))))
+ (run-nar-herder-service opts lgr))))